You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/07/16 21:13:55 UTC
svn commit: r1362214 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...
Author: bobby
Date: Mon Jul 16 19:13:55 2012
New Revision: 1362214
URL: http://svn.apache.org/viewvc?rev=1362214&view=rev
Log:
svn merge -c 1362209 FIXES: MAPREDUCE-4437. Race in MR ApplicationMaster can cause reducers to never be scheduled (Jason Lowe via bobby)
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1362214&r1=1362213&r2=1362214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Mon Jul 16 19:13:55 2012
@@ -328,6 +328,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4299. Terasort hangs with MR2 FifoScheduler (Tom White via
bobby)
+ MAPREDUCE-4437. Race in MR ApplicationMaster can cause reducers to never be
+ scheduled (Jason Lowe via bobby)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1362214&r1=1362213&r2=1362214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Mon Jul 16 19:13:55 2012
@@ -122,6 +122,7 @@ public class RMContainerAllocator extend
private int containersReleased = 0;
private int hostLocalAssigned = 0;
private int rackLocalAssigned = 0;
+ private int lastCompletedTasks = 0;
private boolean recalculateReduceSchedule = false;
private int mapResourceReqt;//memory
@@ -205,11 +206,18 @@ public class RMContainerAllocator extend
scheduledRequests.assign(allocatedContainers);
LOG.info("After Assign: " + getStat());
}
-
+
+ int completedMaps = getJob().getCompletedMaps();
+ int completedTasks = completedMaps + getJob().getCompletedReduces();
+ if (lastCompletedTasks != completedTasks) {
+ lastCompletedTasks = completedTasks;
+ recalculateReduceSchedule = true;
+ }
+
if (recalculateReduceSchedule) {
preemptReducesIfNeeded();
scheduleReduces(
- getJob().getTotalMaps(), getJob().getCompletedMaps(),
+ getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceReqt, reduceResourceReqt,
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1362214&r1=1362213&r2=1362214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Mon Jul 16 19:13:55 2012
@@ -1310,7 +1310,63 @@ public class TestRMContainerAllocator {
maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampDownReduces(anyInt());
}
+
+ private static class RecalculateContainerAllocator extends MyContainerAllocator {
+ public boolean recalculatedReduceSchedule = false;
+
+ public RecalculateContainerAllocator(MyResourceManager rm,
+ Configuration conf, ApplicationAttemptId appAttemptId, Job job) {
+ super(rm, conf, appAttemptId, job);
+ }
+
+ @Override
+ public void scheduleReduces(int totalMaps, int completedMaps,
+ int scheduledMaps, int scheduledReduces, int assignedMaps,
+ int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
+ int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
+ recalculatedReduceSchedule = true;
+ }
+ }
+ @Test
+ public void testCompletedTasksRecalculateSchedule() throws Exception {
+ LOG.info("Running testCompletedTasksRecalculateSchedule");
+
+ Configuration conf = new Configuration();
+ final MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job job = mock(Job.class);
+ when(job.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false));
+ doReturn(10).when(job).getTotalMaps();
+ doReturn(10).when(job).getTotalReduces();
+ doReturn(0).when(job).getCompletedMaps();
+ RecalculateContainerAllocator allocator =
+ new RecalculateContainerAllocator(rm, conf, appAttemptId, job);
+ allocator.schedule();
+
+ allocator.recalculatedReduceSchedule = false;
+ allocator.schedule();
+ Assert.assertFalse("Unexpected recalculate of reduce schedule",
+ allocator.recalculatedReduceSchedule);
+
+ doReturn(1).when(job).getCompletedMaps();
+ allocator.schedule();
+ Assert.assertTrue("Expected recalculate of reduce schedule",
+ allocator.recalculatedReduceSchedule);
+ }
+
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();
@@ -1319,6 +1375,7 @@ public class TestRMContainerAllocator {
t.testReportedAppProgress();
t.testReportedAppProgressWithOnlyMaps();
t.testBlackListedNodes();
+ t.testCompletedTasksRecalculateSchedule();
}
}