You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Yang Hao (JIRA)" <ji...@apache.org> on 2014/11/27 14:32:12 UTC

[jira] [Updated] (MAPREDUCE-6176) Users should limit the number of an application

     [ https://issues.apache.org/jira/browse/MAPREDUCE-6176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yang Hao updated MAPREDUCE-6176:
--------------------------------
    Release Note:   (was: diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 7c18f06..6f69168 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -111,16 +111,18 @@
   completed -> request corresponding to which container has completed
   
   Lifecycle of map
-  scheduled->assigned->completed
-  
+  pending->scheduled->assigned->completed
+
   Lifecycle of reduce
   pending->scheduled->assigned->completed
-  
-  Maps are scheduled as soon as their requests are received. Reduces are 
-  added to the pending and are ramped up (added to scheduled) based 
-  on completed maps and current availability in the cluster.
+
+  Maps are added to the pending are scheduled when the assigned maps plus scheduled map less than max number of map. Reduces are
+  added to the pending and are ramped up (added to scheduled) based
+  on completed maps and current availability in the cluster as well as the max number of reduce.
   */
-  
+  //maps which are not yet scheduled
+  private final LinkedList<ContainerRequestEvent> pendingMaps =
+    new LinkedList<ContainerRequestEvent>();
   //reduces which are not yet scheduled
   private final LinkedList<ContainerRequest> pendingReduces = 
     new LinkedList<ContainerRequest>();
@@ -176,6 +178,14 @@ protected void serviceInit(Configuration conf) throws Exception {
     // Init startTime to current time. If all goes well, it will be reset after
     // first attempt to contact RM.
     retrystartTime = System.currentTimeMillis();
+    scheduleStats.numMaxMaps = conf.getInt(MRJobConfig.MR_MAP_NUM_MAX, Integer.MAX_VALUE);
+    if (scheduleStats.numMaxMaps <= 0) {
+      scheduleStats.numMaxMaps = Integer.MAX_VALUE;
+    }
+    scheduleStats.numMaxReduces = conf.getInt(MRJobConfig.MR_REDUCE_NUM_MAX, Integer.MAX_VALUE);
+    if (scheduleStats.numMaxReduces <= 0) {
+      scheduleStats.numMaxReduces = Integer.MAX_VALUE;
+    }
   }
 
   @Override
@@ -216,6 +226,7 @@ public void run() {
 
   @Override
   protected synchronized void heartbeat() throws Exception {
+    scheduleMaps();
     scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
     List<Container> allocatedContainers = getResources();
     if (allocatedContainers.size() > 0) {
@@ -233,12 +244,13 @@ protected synchronized void heartbeat() throws Exception {
     if (recalculateReduceSchedule) {
       preemptReducesIfNeeded();
       scheduleReduces(
-          getJob().getTotalMaps(), completedMaps,
-          scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
-          assignedRequests.maps.size(), assignedRequests.reduces.size(),
-          mapResourceReqt, reduceResourceReqt,
-          pendingReduces.size(), 
-          maxReduceRampupLimit, reduceSlowStart);
+              getJob().getTotalMaps(), completedMaps,
+              scheduledRequests.maps.size() + pendingMaps.size(),
+              scheduledRequests.reduces.size(),
+              assignedRequests.maps.size(), assignedRequests.reduces.size(),
+              mapResourceReqt, reduceResourceReqt,
+              pendingReduces.size(),
+              maxReduceRampupLimit, reduceSlowStart);
       recalculateReduceSchedule = false;
     }
 
@@ -313,7 +325,14 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) {
         //set the rounded off memory
         reqEvent.getCapability().setMemory(mapResourceReqt.getMemory());
         reqEvent.getCapability().setVirtualCores(mapResourceReqt.getVirtualCores());
-        scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
+        //scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
+        //将map请求添加到pending队列,然后进行调度
+        if (reqEvent.getEarlierAttemptFailed()) {
+          pendingMaps.addFirst(reqEvent);
+        } else {
+          pendingMaps.add(reqEvent);
+        }
+        scheduleMaps();
       } else {
         if (reduceResourceReqt.equals(Resources.none())) {
           reduceResourceReqt = reqEvent.getCapability();
@@ -375,6 +394,16 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) {
     }
   }
 
+  private void scheduleMaps() {
+    LOG.info("scheduling maps from pending queue");
+    //more map to be scheduled
+    int num = scheduleStats.numMaxMaps - scheduledRequests.maps.size() - assignedRequests.maps.size();
+    num = Math.min(num, pendingMaps.size());
+    for (int i = 0; i < num; i++) {
+      ContainerRequestEvent request = pendingMaps.removeFirst();
+      scheduledRequests.addMap(request);
+    }
+  }
   private static String getHost(String contMgrAddress) {
     String host = contMgrAddress;
     String[] hostport = host.split(":");
@@ -433,12 +462,12 @@ private void preemptReducesIfNeeded() {
   @Private
   public void scheduleReduces(
       int totalMaps, int completedMaps,
-      int scheduledMaps, int scheduledReduces,
+      int scheduledAndPendingMaps, int scheduledReduces,
       int assignedMaps, int assignedReduces,
       Resource mapResourceReqt, Resource reduceResourceReqt,
       int numPendingReduces,
       float maxReduceRampupLimit, float reduceSlowStart) {
-    
+
     if (numPendingReduces == 0) {
       return;
     }
@@ -465,7 +494,7 @@ public void scheduleReduces(
     
     //if all maps are assigned, then ramp up all reduces irrespective of the
     //headroom
-    if (scheduledMaps == 0 && numPendingReduces > 0) {
+    if (scheduledAndPendingMaps == 0 && numPendingReduces > 0) {
       LOG.info("All maps assigned. " +
           "Ramping up all remaining reduces:" + numPendingReduces);
       scheduleAllReduces();
@@ -478,9 +507,9 @@ public void scheduleReduces(
     } else {
       completedMapPercent = 1;
     }
-    
-    Resource netScheduledMapResource = 
-        Resources.multiply(mapResourceReqt, (scheduledMaps + assignedMaps));
+
+    Resource netScheduledMapResource =
+            Resources.multiply(mapResourceReqt, (scheduledAndPendingMaps + assignedMaps));
 
     Resource netScheduledReduceResource = 
         Resources.multiply(reduceResourceReqt, (scheduledReduces + assignedReduces));
@@ -499,8 +528,8 @@ public void scheduleReduces(
     // check if there aren't enough maps scheduled, give the free map capacity
     // to reduce. 
     // Even when container number equals, there may be unused resources in one dimension
-    if (Resources.computeAvailableContainers(ideaMapResourceLimit, mapResourceReqt) 
-        >= (scheduledMaps + assignedMaps)) {
+    if (Resources.computeAvailableContainers(ideaMapResourceLimit, mapResourceReqt)
+            >= (scheduledAndPendingMaps + assignedMaps)) {
       // enough resource given to maps, given the remaining to reduces
       Resource unusedMapResourceLimit = Resources.subtract(ideaMapResourceLimit, netScheduledMapResource);
       finalReduceResourceLimit = Resources.add(idealReduceResourceLimit, unusedMapResourceLimit);
@@ -524,7 +553,7 @@ public void scheduleReduces(
     
     if (rampUp > 0) {
       rampUp = Math.min(rampUp, numPendingReduces);
-      LOG.info("Ramping up " + rampUp);
+
       rampUpReduces(rampUp);
     } else if (rampUp < 0){
       int rampDown = -1 * rampUp;
@@ -536,15 +565,18 @@ public void scheduleReduces(
 
   @Private
   public void scheduleAllReduces() {
-    for (ContainerRequest req : pendingReduces) {
-      scheduledRequests.addReduce(req);
-    }
-    pendingReduces.clear();
+//    for (ContainerRequest req : pendingReduces) {
+//      scheduledRequests.addReduce(req);
+//    }
+//    pendingReduces.clear();
+    rampUpReduces(Integer.MAX_VALUE);
   }
   
   @Private
   public void rampUpReduces(int rampUp) {
     //more reduce to be scheduled
+    rampUp = Math.min(rampUp, scheduleStats.numMaxReduces - scheduledRequests.reduces.size()-assignedRequests.reduces.size());
+    LOG.info("Ramping up " + rampUp);
     for (int i = 0; i < rampUp; i++) {
       ContainerRequest request = pendingReduces.removeFirst();
       scheduledRequests.addReduce(request);
@@ -1202,6 +1234,9 @@ ContainerId get(TaskAttemptId tId) {
   }
 
   private class ScheduleStats {
+    int numMaxMaps;
+    int numMaxReduces;
+    int numPendingMaps;
     int numPendingReduces;
     int numScheduledMaps;
     int numScheduledReduces;
@@ -1217,6 +1252,8 @@ public void updateAndLogIfChanged(String msgPrefix) {
 
       // synchronized to fix findbug warnings
       synchronized (RMContainerAllocator.this) {
+        changed |= (numPendingMaps != pendingMaps.size());
+        numPendingMaps = pendingMaps.size();
         changed |= (numPendingReduces != pendingReduces.size());
         numPendingReduces = pendingReduces.size();
         changed |= (numScheduledMaps != scheduledRequests.maps.size());
@@ -1243,7 +1280,8 @@ public void updateAndLogIfChanged(String msgPrefix) {
     }
 
     public void log(String msgPrefix) {
-        LOG.info(msgPrefix + "PendingReds:" + numPendingReduces +
+        LOG.info(msgPrefix + " PendingMaps: " + numPendingMaps +
+        " PendingReds:" + numPendingReduces +
         " ScheduledMaps:" + numScheduledMaps +
         " ScheduledReds:" + numScheduledReduces +
         " AssignedMaps:" + numAssignedMaps +
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index fe96c52..66aa3dd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -738,5 +738,11 @@
   public static final int DEFAULT_MR_AM_MAX_ATTEMPTS = 2;
   
   public static final String MR_APPLICATION_TYPE = "MAPREDUCE";
-  
+
+  /**
+   * limit the number of maps and reduces
+   */
+  public static final String MR_MAP_NUM_MAX = "mapreduce.map.num.max";
+
+  public static final String MR_REDUCE_NUM_MAX = "mapreduce.reduce.num.max";
 }
)

> Users should limit the number of an application
> -----------------------------------------------
>
>                 Key: MAPREDUCE-6176
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6176
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: mr-am, mrv2
>    Affects Versions: 2.5.0, 2.4.1, 2.5.1, 2.5.2
>            Reporter: Yang Hao
>            Assignee: Yang Hao
>              Labels: patch
>             Fix For: 2.4.1
>
>
> As MapReduce is batch framework of calculation, so people may want to run application A as well as application B . A good way to do so is that we can limit the number of application's map task or reduce task. If we set mapreduce.map.num.max as M, then the map task number will not exceed M. At the same time, if we set mapreduce.map.num.max as R, then the reduce task number will not exceed R



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)