You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/08/16 00:28:07 UTC
[13/50] [abbrv] hadoop git commit: MAPREDUCE-6870. Add configuration
for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo
Chen)
MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138
Branch: refs/heads/YARN-6592
Commit: a32e0138fb63c92902e6613001f38a87c8a41321
Parents: 312e57b
Author: Haibo Chen <ha...@apache.org>
Authored: Thu Aug 10 15:17:36 2017 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Thu Aug 10 15:17:36 2017 -0700
----------------------------------------------------------------------
.../mapreduce/v2/app/job/impl/JobImpl.java | 35 ++++-
.../mapreduce/v2/app/job/impl/TestJobImpl.java | 139 +++++++++++++++----
.../apache/hadoop/mapreduce/MRJobConfig.java | 6 +-
.../src/main/resources/mapred-default.xml | 8 ++
4 files changed, 160 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 4d155d0..6880b6c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -644,6 +644,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private float reduceProgress;
private float cleanupProgress;
private boolean isUber = false;
+ private boolean finishJobWhenReducersDone;
+ private boolean completingJob = false;
private Credentials jobCredentials;
private Token<JobTokenIdentifier> jobToken;
@@ -717,6 +719,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
this.maxFetchFailuresNotifications = conf.getInt(
MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
+ this.finishJobWhenReducersDone = conf.getBoolean(
+ MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE,
+ MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE);
}
protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
@@ -2021,7 +2026,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
TimeUnit.MILLISECONDS);
return JobStateInternal.FAIL_WAIT;
}
-
+
+ checkReadyForCompletionWhenAllReducersDone(job);
+
return job.checkReadyForCommit();
}
@@ -2052,6 +2059,32 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
}
job.metrics.killedTask(task);
}
+
+ /** Improvement: if all reducers have finished, we check if we have
+ restarted mappers that are still running. This can happen in a
+ situation when a node becomes UNHEALTHY and mappers are rescheduled.
+ See MAPREDUCE-6870 for details */
+ private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) {
+ if (job.finishJobWhenReducersDone) {
+ int totalReduces = job.getTotalReduces();
+ int completedReduces = job.getCompletedReduces();
+
+ if (totalReduces > 0 && totalReduces == completedReduces
+ && !job.completingJob) {
+
+ for (TaskId mapTaskId : job.mapTasks) {
+ MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId);
+ if (!task.isFinished()) {
+ LOG.info("Killing map task " + task.getID());
+ job.eventHandler.handle(
+ new TaskEvent(task.getID(), TaskEventType.T_KILL));
+ }
+ }
+
+ job.completingJob = true;
+ }
+ }
+ }
}
// Transition class for handling jobs with no tasks
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
index 2147ec1..1827ce4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
@@ -564,33 +564,13 @@ public class TestJobImpl {
dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler);
// replace the tasks with spied versions to return the right attempts
- Map<TaskId,Task> spiedTasks = new HashMap<TaskId,Task>();
- List<NodeReport> nodeReports = new ArrayList<NodeReport>();
- Map<NodeReport,TaskId> nodeReportsToTaskIds =
- new HashMap<NodeReport,TaskId>();
- for (Map.Entry<TaskId,Task> e: job.tasks.entrySet()) {
- TaskId taskId = e.getKey();
- Task task = e.getValue();
- if (taskId.getTaskType() == TaskType.MAP) {
- // add an attempt to the task to simulate nodes
- NodeId nodeId = mock(NodeId.class);
- TaskAttempt attempt = mock(TaskAttempt.class);
- when(attempt.getNodeId()).thenReturn(nodeId);
- TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
- when(attempt.getID()).thenReturn(attemptId);
- // create a spied task
- Task spied = spy(task);
- doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
- spiedTasks.put(taskId, spied);
+ Map<TaskId, Task> spiedTasks = new HashMap<>();
+ List<NodeReport> nodeReports = new ArrayList<>();
+ Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>();
+
+ createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job,
+ NodeState.UNHEALTHY, nodeReports);
- // create a NodeReport based on the node id
- NodeReport report = mock(NodeReport.class);
- when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY);
- when(report.getNodeId()).thenReturn(nodeId);
- nodeReports.add(report);
- nodeReportsToTaskIds.put(report, taskId);
- }
- }
// replace the tasks with the spied tasks
job.tasks.putAll(spiedTasks);
@@ -641,6 +621,82 @@ public class TestJobImpl {
commitHandler.stop();
}
+ @Test
+ public void testJobNCompletedWhenAllReducersAreFinished()
+ throws Exception {
+ testJobCompletionWhenReducersAreFinished(true);
+ }
+
+ @Test
+ public void testJobNotCompletedWhenAllReducersAreFinished()
+ throws Exception {
+ testJobCompletionWhenReducersAreFinished(false);
+ }
+
+ private void testJobCompletionWhenReducersAreFinished(boolean killMappers)
+ throws InterruptedException, BrokenBarrierException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, killMappers);
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ dispatcher.init(conf);
+ final List<TaskEvent> killedEvents =
+ Collections.synchronizedList(new ArrayList<TaskEvent>());
+ dispatcher.register(TaskEventType.class, new EventHandler<TaskEvent>() {
+ @Override
+ public void handle(TaskEvent event) {
+ if (event.getType() == TaskEventType.T_KILL) {
+ killedEvents.add(event);
+ }
+ }
+ });
+ dispatcher.start();
+ CyclicBarrier syncBarrier = new CyclicBarrier(2);
+ OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
+
+ final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
+
+ // replace the tasks with spied versions to return the right attempts
+ Map<TaskId, Task> spiedTasks = new HashMap<>();
+ List<NodeReport> nodeReports = new ArrayList<>();
+ Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>();
+
+ createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job,
+ NodeState.RUNNING, nodeReports);
+
+ // replace the tasks with the spied tasks
+ job.tasks.putAll(spiedTasks);
+
+ // finish reducer
+ for (TaskId taskId: job.tasks.keySet()) {
+ if (taskId.getTaskType() == TaskType.REDUCE) {
+ job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
+ }
+ }
+
+ dispatcher.await();
+
+ /*
+ * StubbedJob cannot finish in this test - we'd have to generate the
+ * necessary events in this test manually, but that wouldn't add too
+ * much value. Instead, we validate the T_KILL events.
+ */
+ if (killMappers) {
+ Assert.assertEquals("Number of killed events", 2, killedEvents.size());
+ Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000000",
+ killedEvents.get(0).getTaskID().toString());
+ Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000001",
+ killedEvents.get(1).getTaskID().toString());
+ } else {
+ Assert.assertEquals("Number of killed events", 0, killedEvents.size());
+ }
+ }
+
public static void main(String[] args) throws Exception {
TestJobImpl t = new TestJobImpl();
t.testJobNoTasks();
@@ -1021,6 +1077,37 @@ public class TestJobImpl {
Assert.assertEquals(state, job.getInternalState());
}
+ private void createSpiedMapTasks(Map<NodeReport, TaskId>
+ nodeReportsToTaskIds, Map<TaskId, Task> spiedTasks, JobImpl job,
+ NodeState nodeState, List<NodeReport> nodeReports) {
+ for (Map.Entry<TaskId, Task> e: job.tasks.entrySet()) {
+ TaskId taskId = e.getKey();
+ Task task = e.getValue();
+ if (taskId.getTaskType() == TaskType.MAP) {
+ // add an attempt to the task to simulate nodes
+ NodeId nodeId = mock(NodeId.class);
+ TaskAttempt attempt = mock(TaskAttempt.class);
+ when(attempt.getNodeId()).thenReturn(nodeId);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ when(attempt.getID()).thenReturn(attemptId);
+ // create a spied task
+ Task spied = spy(task);
+ Map<TaskAttemptId, TaskAttempt> attemptMap = new HashMap<>();
+ attemptMap.put(attemptId, attempt);
+ when(spied.getAttempts()).thenReturn(attemptMap);
+ doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
+ spiedTasks.put(taskId, spied);
+
+ // create a NodeReport based on the node id
+ NodeReport report = mock(NodeReport.class);
+ when(report.getNodeState()).thenReturn(nodeState);
+ when(report.getNodeId()).thenReturn(nodeId);
+ nodeReports.add(report);
+ nodeReportsToTaskIds.put(report, taskId);
+ }
+ }
+ }
+
private static class JobSubmittedEventHandler implements
EventHandler<JobHistoryEvent> {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index cfc1bcc..2023ba3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -431,7 +431,7 @@ public interface MRJobConfig {
public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
-
+
public static final String JOB_RUNNING_MAP_LIMIT =
"mapreduce.job.running.map.limit";
public static final int DEFAULT_JOB_RUNNING_MAP_LIMIT = 0;
@@ -1033,4 +1033,8 @@ public interface MRJobConfig {
String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties";
String MR_JOB_SEND_TOKEN_CONF = "mapreduce.job.send-token-conf";
+
+ String FINISH_JOB_WHEN_REDUCERS_DONE =
+ "mapreduce.job.finish-when-all-reducers-done";
+ boolean DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE = true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 101aa07..ee9b906 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1126,6 +1126,14 @@
</property>
<property>
+ <name>mapreduce.job.finish-when-all-reducers-done</name>
+ <value>true</value>
+ <description>Specifies whether the job should complete once all reducers
+ have finished, regardless of whether there are still running mappers.
+ </description>
+</property>
+
+<property>
<name>mapreduce.job.token.tracking.ids.enabled</name>
<value>false</value>
<description>Whether to write tracking ids of tokens to
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org