You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/07/16 21:13:10 UTC
svn commit: r1362213 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/tes...
Author: bobby
Date: Mon Jul 16 19:13:10 2012
New Revision: 1362213
URL: http://svn.apache.org/viewvc?rev=1362213&view=rev
Log:
svn merge -c 1362209 FIXES: MAPREDUCE-4437. Race in MR ApplicationMaster can cause reducers to never be scheduled (Jason Lowe via bobby)
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1362213&r1=1362212&r2=1362213&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon Jul 16 19:13:10 2012
@@ -573,6 +573,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4299. Terasort hangs with MR2 FifoScheduler (Tom White via
bobby)
+ MAPREDUCE-4437. Race in MR ApplicationMaster can cause reducers to never be
+ scheduled (Jason Lowe via bobby)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1362213&r1=1362212&r2=1362213&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Mon Jul 16 19:13:10 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -47,9 +46,6 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -131,6 +127,7 @@ public class RMContainerAllocator extend
private int containersReleased = 0;
private int hostLocalAssigned = 0;
private int rackLocalAssigned = 0;
+ private int lastCompletedTasks = 0;
private boolean recalculateReduceSchedule = false;
private int mapResourceReqt;//memory
@@ -214,11 +211,18 @@ public class RMContainerAllocator extend
scheduledRequests.assign(allocatedContainers);
LOG.info("After Assign: " + getStat());
}
-
+
+ int completedMaps = getJob().getCompletedMaps();
+ int completedTasks = completedMaps + getJob().getCompletedReduces();
+ if (lastCompletedTasks != completedTasks) {
+ lastCompletedTasks = completedTasks;
+ recalculateReduceSchedule = true;
+ }
+
if (recalculateReduceSchedule) {
preemptReducesIfNeeded();
scheduleReduces(
- getJob().getTotalMaps(), getJob().getCompletedMaps(),
+ getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceReqt, reduceResourceReqt,
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1362213&r1=1362212&r2=1362213&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Mon Jul 16 19:13:10 2012
@@ -1409,7 +1409,63 @@ public class TestRMContainerAllocator {
maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampDownReduces(anyInt());
}
+
+ private static class RecalculateContainerAllocator extends MyContainerAllocator {
+ public boolean recalculatedReduceSchedule = false;
+
+ public RecalculateContainerAllocator(MyResourceManager rm,
+ Configuration conf, ApplicationAttemptId appAttemptId, Job job) {
+ super(rm, conf, appAttemptId, job);
+ }
+
+ @Override
+ public void scheduleReduces(int totalMaps, int completedMaps,
+ int scheduledMaps, int scheduledReduces, int assignedMaps,
+ int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
+ int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
+ recalculatedReduceSchedule = true;
+ }
+ }
+ @Test
+ public void testCompletedTasksRecalculateSchedule() throws Exception {
+ LOG.info("Running testCompletedTasksRecalculateSchedule");
+
+ Configuration conf = new Configuration();
+ final MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job job = mock(Job.class);
+ when(job.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false));
+ doReturn(10).when(job).getTotalMaps();
+ doReturn(10).when(job).getTotalReduces();
+ doReturn(0).when(job).getCompletedMaps();
+ RecalculateContainerAllocator allocator =
+ new RecalculateContainerAllocator(rm, conf, appAttemptId, job);
+ allocator.schedule();
+
+ allocator.recalculatedReduceSchedule = false;
+ allocator.schedule();
+ Assert.assertFalse("Unexpected recalculate of reduce schedule",
+ allocator.recalculatedReduceSchedule);
+
+ doReturn(1).when(job).getCompletedMaps();
+ allocator.schedule();
+ Assert.assertTrue("Expected recalculate of reduce schedule",
+ allocator.recalculatedReduceSchedule);
+ }
+
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();
@@ -1418,6 +1474,7 @@ public class TestRMContainerAllocator {
t.testReportedAppProgress();
t.testReportedAppProgressWithOnlyMaps();
t.testBlackListedNodes();
+ t.testCompletedTasksRecalculateSchedule();
}
}