You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/07/16 21:13:10 UTC

svn commit: r1362213 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/tes...

Author: bobby
Date: Mon Jul 16 19:13:10 2012
New Revision: 1362213

URL: http://svn.apache.org/viewvc?rev=1362213&view=rev
Log:
svn merge -c 1362209 FIXES: MAPREDUCE-4437. Race in MR ApplicationMaster can cause reducers to never be scheduled (Jason Lowe via bobby)

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1362213&r1=1362212&r2=1362213&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon Jul 16 19:13:10 2012
@@ -573,6 +573,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4299. Terasort hangs with MR2 FifoScheduler (Tom White via
     bobby)
 
+    MAPREDUCE-4437. Race in MR ApplicationMaster can cause reducers to never be
+    scheduled (Jason Lowe via bobby)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1362213&r1=1362212&r2=1362213&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Mon Jul 16 19:13:10 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -47,9 +46,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -131,6 +127,7 @@ public class RMContainerAllocator extend
   private int containersReleased = 0;
   private int hostLocalAssigned = 0;
   private int rackLocalAssigned = 0;
+  private int lastCompletedTasks = 0;
   
   private boolean recalculateReduceSchedule = false;
   private int mapResourceReqt;//memory
@@ -214,11 +211,18 @@ public class RMContainerAllocator extend
       scheduledRequests.assign(allocatedContainers);
       LOG.info("After Assign: " + getStat());
     }
-    
+
+    int completedMaps = getJob().getCompletedMaps();
+    int completedTasks = completedMaps + getJob().getCompletedReduces();
+    if (lastCompletedTasks != completedTasks) {
+      lastCompletedTasks = completedTasks;
+      recalculateReduceSchedule = true;
+    }
+
     if (recalculateReduceSchedule) {
       preemptReducesIfNeeded();
       scheduleReduces(
-          getJob().getTotalMaps(), getJob().getCompletedMaps(),
+          getJob().getTotalMaps(), completedMaps,
           scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
           assignedRequests.maps.size(), assignedRequests.reduces.size(),
           mapResourceReqt, reduceResourceReqt,

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1362213&r1=1362212&r2=1362213&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Mon Jul 16 19:13:10 2012
@@ -1409,7 +1409,63 @@ public class TestRMContainerAllocator {
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator).rampDownReduces(anyInt());
   }
+
+  private static class RecalculateContainerAllocator extends MyContainerAllocator {
+    public boolean recalculatedReduceSchedule = false;
+
+    public RecalculateContainerAllocator(MyResourceManager rm,
+        Configuration conf, ApplicationAttemptId appAttemptId, Job job) {
+      super(rm, conf, appAttemptId, job);
+    }
+
+    @Override
+    public void scheduleReduces(int totalMaps, int completedMaps,
+        int scheduledMaps, int scheduledReduces, int assignedMaps,
+        int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
+        int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
+      recalculatedReduceSchedule = true;
+    }
+  }
   
+  @Test
+  public void testCompletedTasksRecalculateSchedule() throws Exception {
+    LOG.info("Running testCompletedTasksRecalculateSchedule");
+
+    Configuration conf = new Configuration();
+    final MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job job = mock(Job.class);
+    when(job.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+    doReturn(10).when(job).getTotalMaps();
+    doReturn(10).when(job).getTotalReduces();
+    doReturn(0).when(job).getCompletedMaps();
+    RecalculateContainerAllocator allocator =
+        new RecalculateContainerAllocator(rm, conf, appAttemptId, job);
+    allocator.schedule();
+
+    allocator.recalculatedReduceSchedule = false;
+    allocator.schedule();
+    Assert.assertFalse("Unexpected recalculate of reduce schedule",
+        allocator.recalculatedReduceSchedule);
+
+    doReturn(1).when(job).getCompletedMaps();
+    allocator.schedule();
+    Assert.assertTrue("Expected recalculate of reduce schedule",
+        allocator.recalculatedReduceSchedule);
+  }
+
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();
@@ -1418,6 +1474,7 @@ public class TestRMContainerAllocator {
     t.testReportedAppProgress();
     t.testReportedAppProgressWithOnlyMaps();
     t.testBlackListedNodes();
+    t.testCompletedTasksRecalculateSchedule();
   }
 
 }