You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:14:04 UTC

svn commit: r1077430 - /hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java

Author: omalley
Date: Fri Mar  4 04:14:04 2011
New Revision: 1077430

URL: http://svn.apache.org/viewvc?rev=1077430&view=rev
Log:
commit b0b5aca14b99ded8cd9f2d0d31f555f5a4a73197
Author: Hong Tang <ht...@yahoo-inc.com>
Date:   Tue Apr 27 14:52:55 2010 -0700

    MAPREDUCE-1687. Stress submission policy does not always stress the cluster. (htang)
    
    From https://issues.apache.org/jira/secure/attachment/12442692/mr-1687-yhadoop-20.1xx-20100423-2.patch.
    
    +++ b/YAHOO-CHANGES.txt
    +	MAPREDUCE-1687. Stress submission policy does not always stress the
    +	cluster. (htang)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java?rev=1077430&r1=1077429&r2=1077430&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java Fri Mar  4 04:14:04 2011
@@ -36,8 +36,8 @@ import java.util.concurrent.locks.Condit
 public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
   public static final Log LOG = LogFactory.getLog(StressJobFactory.class);
 
-  private LoadStatus loadStatus = new LoadStatus();
-  private final Condition overloaded = this.lock.newCondition();
+  private final LoadStatus loadStatus = new LoadStatus();
+  private final Condition condUnderloaded = this.lock.newCondition();
   /**
    * The minimum ratio between pending+running map tasks (aka. incomplete map
    * tasks) and cluster map slot capacity for us to consider the cluster is
@@ -47,6 +47,27 @@ public class StressJobFactory extends Jo
   static final float OVERLOAD_MAPTASK_MAPSLOT_RATIO = 2.0f;
 
   /**
+   * The minimum ratio between pending+running reduce tasks (aka. incomplete
+   * reduce tasks) and cluster reduce slot capacity for us to consider the
+   * cluster is overloaded. For running reduces, we only count them partially.
+   * Namely, a 40% completed reduce is counted as 0.6 reduce tasks in our
+   * calculation.
+   */
+  static final float OVERLOAD_REDUCETASK_REDUCESLOT_RATIO = 2.5f;
+
+  /**
+   * The maximum share of the cluster's mapslot capacity that can be counted
+   * toward a job's incomplete map tasks in overload calculation.
+   */
+  static final float MAX_MAPSLOT_SHARE_PER_JOB=0.1f;
+
+  /**
+   * The maximum share of the cluster's reduceslot capacity that can be counted
+   * toward a job's incomplete reduce tasks in overload calculation.
+   */
+  static final float MAX_REDUCESLOT_SHARE_PER_JOB=0.1f;
+
+  /**
    * Creating a new instance does not start the thread.
    *
    * @param submitter   Component to which deserialized jobs are passed
@@ -63,11 +84,6 @@ public class StressJobFactory extends Jo
     throws IOException {
     super(
       submitter, jobProducer, scratch, conf, startFlag, resolver);
-
-    //Setting isOverloaded as true , now JF would wait for atleast first
-    //set of ClusterStats based on which it can decide how many job it has
-    //to submit.
-    this.loadStatus.isOverloaded = true;
   }
 
   public Thread createReaderThread() {
@@ -104,33 +120,37 @@ public class StressJobFactory extends Jo
         while (!Thread.currentThread().isInterrupted()) {
           lock.lock();
           try {
-            while (loadStatus.isOverloaded) {
+            while (loadStatus.overloaded()) {
               //Wait while JT is overloaded.
               try {
-                overloaded.await();
+                condUnderloaded.await();
               } catch (InterruptedException ie) {
                 return;
               }
             }
 
-            int noOfSlotsAvailable = loadStatus.numSlotsBackfill;
-            LOG.info("No of slots to be backfilled are " + noOfSlotsAvailable);
-
-            for (int i = 0; i < noOfSlotsAvailable; i++) {
+            while (!loadStatus.overloaded()) {
               try {
                 final JobStory job = getNextJobFiltered();
                 if (null == job) {
                   return;
                 }
-                //TODO: We need to take care of scenario when one map takes more
-                //than 1 slot.
-                i += job.getNumberMaps();
-
+                
                 submitter.add(
                   jobCreator.createGridmixJob(
                     conf, 0L, job, scratch, userResolver.getTargetUgi(
                       UserGroupInformation.createRemoteUser(
                         job.getUser())), sequence.getAndIncrement()));
+                // TODO: We need to take care of scenario when one map/reduce
+                // takes more than 1 slot.
+                loadStatus.mapSlotsBackfill -= 
+                  calcEffectiveIncompleteMapTasks(
+                    loadStatus.mapSlotCapacity, job.getNumberMaps(), 0.0f);
+                loadStatus.reduceSlotsBackfill -= 
+                  calcEffectiveIncompleteReduceTasks(
+                    loadStatus.reduceSlotCapacity, job.getNumberReduces(), 
+                    0.0f);
+                --loadStatus.numJobsBackfill;
               } catch (IOException e) {
                 LOG.error("Error while submitting the job ", e);
                 error = e;
@@ -166,12 +186,33 @@ public class StressJobFactory extends Jo
       } catch (IOException e) {
         LOG.error("Couldn't get the new Status",e);
       }
-      overloaded.signalAll();
+      if (!loadStatus.overloaded()) {
+        condUnderloaded.signalAll();
+      }
     } finally {
       lock.unlock();
     }
   }
 
+  static float calcEffectiveIncompleteMapTasks(int mapSlotCapacity,
+      int numMaps, float mapProgress) {
+    float maxEffIncompleteMapTasks = Math.max(1.0f, mapSlotCapacity
+        * MAX_MAPSLOT_SHARE_PER_JOB);
+    float mapProgressAdjusted = Math.max(Math.min(mapProgress, 1.0f), 0.0f);
+    return Math.min(maxEffIncompleteMapTasks, numMaps
+        * (1.0f - mapProgressAdjusted));
+  }
+
+  static float calcEffectiveIncompleteReduceTasks(int reduceSlotCapacity,
+      int numReduces, float reduceProgress) {
+    float maxEffIncompleteReduceTasks = Math.max(1.0f, reduceSlotCapacity
+        * MAX_REDUCESLOT_SHARE_PER_JOB);
+    float reduceProgressAdjusted = Math.max(Math.min(reduceProgress, 1.0f),
+        0.0f);
+    return Math.min(maxEffIncompleteReduceTasks, numReduces
+        * (1.0f - reduceProgressAdjusted));
+  }
+
   /**
    * We try to use some light-weight mechanism to determine cluster load.
    *
@@ -181,62 +222,98 @@ public class StressJobFactory extends Jo
    */
   private void checkLoadAndGetSlotsToBackfill(
     ClusterStats stats, ClusterStatus clusterStatus) throws IOException {
-    // If there are more jobs than number of task trackers, we assume the
-    // cluster is overloaded. 
-    if (stats.getNumRunningJob() >= clusterStatus.getTaskTrackers()) {
+    loadStatus.mapSlotCapacity = clusterStatus.getMaxMapTasks();
+    loadStatus.reduceSlotCapacity = clusterStatus.getMaxReduceTasks();
+    
+    
+    loadStatus.numJobsBackfill = clusterStatus.getTaskTrackers()
+        - stats.getNumRunningJob();
+    if (loadStatus.numJobsBackfill <= 0) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug(
-          System.currentTimeMillis() + " Overloaded is " +
-            Boolean.TRUE.toString() + " #runningJobs >= taskTrackerCount (" +
-            stats.getNumRunningJob() + " >= " +
-            clusterStatus.getTaskTrackers() + " )\n");
-      }
-      loadStatus.isOverloaded = true;
-      loadStatus.numSlotsBackfill = 0;
-      return;
+        LOG.debug(System.currentTimeMillis() + " Overloaded is "
+            + Boolean.TRUE.toString() + " NumJobsBackfill is "
+            + loadStatus.numJobsBackfill);
+      }
+      return; // stop calculation because we know it is overloaded.
     }
 
     float incompleteMapTasks = 0; // include pending & running map tasks.
     for (JobStats job : ClusterStats.getRunningJobStats()) {
       float mapProgress = job.getJob().mapProgress();
       int noOfMaps = job.getNoOfMaps();
-      incompleteMapTasks += (1 - Math.min(mapProgress,1.0))* noOfMaps;
+      incompleteMapTasks += calcEffectiveIncompleteMapTasks(clusterStatus
+          .getMaxMapTasks(), noOfMaps, mapProgress);
+    }
+    loadStatus.mapSlotsBackfill = (int) (OVERLOAD_MAPTASK_MAPSLOT_RATIO
+        * clusterStatus.getMaxMapTasks() - incompleteMapTasks);
+    if (loadStatus.mapSlotsBackfill <= 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(System.currentTimeMillis() + " Overloaded is "
+            + Boolean.TRUE.toString() + " MapSlotsBackfill is "
+            + loadStatus.mapSlotsBackfill);
+      }
+      return; // stop calculation because we know it is overloaded.
     }
 
-    float overloadedThreshold =
-      OVERLOAD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMaxMapTasks();
-    boolean overloaded = incompleteMapTasks > overloadedThreshold;
-    String relOp = (overloaded) ? ">" : "<=";
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(
-        System.currentTimeMillis() + " Overloaded is " + Boolean.toString(
-          overloaded) + " incompleteMapTasks " + relOp + " " +
-          OVERLOAD_MAPTASK_MAPSLOT_RATIO + "*mapSlotCapacity" + "(" +
-          incompleteMapTasks + " " + relOp + " " +
-          OVERLOAD_MAPTASK_MAPSLOT_RATIO + "*" +
-          clusterStatus.getMaxMapTasks() + ")");
-    }
-    if (overloaded) {
-      loadStatus.isOverloaded = true;
-      loadStatus.numSlotsBackfill = 0;
-    } else {
-      loadStatus.isOverloaded = false;
-      loadStatus.numSlotsBackfill =
-        (int) (overloadedThreshold - incompleteMapTasks);
+    float incompleteReduceTasks = 0; // include pending & running reduce tasks.
+    for (JobStats job : ClusterStats.getRunningJobStats()) {
+      int noOfReduces = job.getJob().getNumReduceTasks();
+      if (noOfReduces > 0) {
+        float reduceProgress = job.getJob().reduceProgress();
+        incompleteReduceTasks += calcEffectiveIncompleteReduceTasks(
+            clusterStatus.getMaxReduceTasks(), noOfReduces, reduceProgress);
+      }
+    }
+    loadStatus.reduceSlotsBackfill = (int) (OVERLOAD_REDUCETASK_REDUCESLOT_RATIO
+        * clusterStatus.getMaxReduceTasks() - incompleteReduceTasks);
+    if (loadStatus.reduceSlotsBackfill <= 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(System.currentTimeMillis() + " Overloaded is "
+            + Boolean.TRUE.toString() + " ReduceSlotsBackfill is "
+            + loadStatus.reduceSlotsBackfill);
+      }
+      return; // stop calculation because we know it is overloaded.
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Current load Status is " + loadStatus);
+      LOG.debug(System.currentTimeMillis() + " Overloaded is "
+          + Boolean.FALSE.toString() + "Current load Status is " + loadStatus);
     }
   }
 
   static class LoadStatus {
-    boolean isOverloaded = false;
-    int numSlotsBackfill = -1;
+    int mapSlotsBackfill;
+    int mapSlotCapacity;
+    int reduceSlotsBackfill;
+    int reduceSlotCapacity;
+    int numJobsBackfill;
 
+    /**
+     * Construct the LoadStatus in an unknown state - assuming the cluster is
+     * overloaded by setting numSlotsBackfill=0.
+     */
+    LoadStatus() {
+      mapSlotsBackfill = 0;
+      reduceSlotsBackfill = 0;
+      numJobsBackfill = 0;
+      
+      mapSlotCapacity = -1;
+      reduceSlotCapacity = -1;
+    }
+    
+    public boolean overloaded() {
+      return (mapSlotsBackfill <= 0) || (reduceSlotsBackfill <= 0)
+          || (numJobsBackfill <= 0);
+    }
+    
     public String toString() {
-      return " is Overloaded " + isOverloaded + " no of slots available " +
-        numSlotsBackfill;
+      return " Overloaded = " + overloaded()
+          + ", MapSlotBackfill = " + mapSlotsBackfill 
+          + ", MapSlotCapacity = " + mapSlotCapacity
+          + ", ReduceSlotBackfill = " + reduceSlotsBackfill 
+          + ", ReduceSlotCapacity = " + reduceSlotCapacity
+          + ", NumJobsBackfill = " + numJobsBackfill
+          ;
     }
   }