You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/26 07:30:43 UTC

svn commit: r748035 - in /hadoop/core/branches/branch-0.19: CHANGES.txt src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java src/test/org/apache/hadoop/mapred/TestResourceEstimation.java

Author: ddas
Date: Thu Feb 26 06:30:43 2009
New Revision: 748035

URL: http://svn.apache.org/viewvc?rev=748035&view=rev
Log:
HADOOP-5241. Committing this to the 0.19 branch.

Modified:
    hadoop/core/branches/branch-0.19/CHANGES.txt
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
    hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=748035&r1=748034&r2=748035&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Thu Feb 26 06:30:43 2009
@@ -13,6 +13,9 @@
     HADOOP-5280. Adds a check to prevent a task state transition from FAILED to
     any of UNASSIGNED, RUNNING, COMMIT_PENDING or SUCCEEDED. (ddas)
 
+    HADOOP-5241. Fixes a bug in disk-space resource estimation. Makes the estimation
+    formula linear where blowUp = Total-Output/Total-Input. (Sharad Agarwal via ddas)
+
 Release 0.19.1 - 2009-02-23
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java?rev=748035&r1=748034&r2=748035&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java Thu Feb 26 06:30:43 2009
@@ -34,82 +34,46 @@
   private static final Log LOG = LogFactory.getLog(
       "org.apache.hadoop.mapred.ResourceEstimator");
 
+  private long completedMapsInputSize;
+  private long completedMapsOutputSize;
 
-  /**
-   * Estimated ratio of output to (input size+1) for map tasks. 
-   */
-  private double mapBlowupRatio;
-  
-  /**
-   * How much relative weight to put on the current estimate.
-   * Each completed map has unit weight.
-   */
-  private double estimateWeight;
+  private int completedMapsUpdates;
   final private JobInProgress job;
   final private int threshholdToUse;
 
   public ResourceEstimator(JobInProgress job) {
     this.job = job;
     threshholdToUse = job.desiredMaps()/ 10;
-    mapBlowupRatio = 0;
-    estimateWeight = 1;
   }
 
+  protected synchronized void updateWithCompletedTask(TaskStatus ts, 
+      TaskInProgress tip) {
 
-  /**
-   * Have private access methods to abstract away synchro.
-   * @return
-   */
-  private synchronized double getBlowupRatio() {
-    return mapBlowupRatio;
-  }
-
-  private synchronized void setBlowupRatio(double b)  {
-    mapBlowupRatio = b;
-  }
-
-  void updateWithCompletedTask(TaskStatus ts, TaskInProgress tip) {
-    
     //-1 indicates error, which we don't average in.
     if(tip.isMapTask() &&  ts.getOutputSize() != -1)  {
-      double blowupOnThisTask = ts.getOutputSize() / 
-        ((double) tip.getMapInputSize() + 1);
-      
-      LOG.info("measured blowup on " + tip.getTIPId() + " was " +
-          ts.getOutputSize() + "/" +(tip.getMapInputSize()+1) + " = " 
-          + blowupOnThisTask);
-      
-      double newEstimate;
-      synchronized(this) {
-        newEstimate = blowupOnThisTask / estimateWeight + 
-            ((estimateWeight - 1) / estimateWeight) * getBlowupRatio();
-        estimateWeight++; 
-      }
-      setBlowupRatio(newEstimate);
-      
-      LOG.info("new estimate is blowup = " + newEstimate);
+      completedMapsUpdates++;
+
+      completedMapsInputSize+=(tip.getMapInputSize()+1);
+      completedMapsOutputSize+=ts.getOutputSize();
+
+      LOG.info("completedMapsUpdates:"+completedMapsUpdates+"  "+
+          "completedMapsInputSize:"+completedMapsInputSize+"  " +
+        "completedMapsOutputSize:"+completedMapsOutputSize);
     }
   }
 
   /**
    * @return estimated length of this job's total map output
    */
-  protected long getEstimatedTotalMapOutputSize()  {
-    double estWeight;
-    synchronized(this) {
-      estWeight = this.estimateWeight;
-    }
-    
-    if(estWeight < threshholdToUse) {
+  protected synchronized long getEstimatedTotalMapOutputSize()  {
+    if(completedMapsUpdates < threshholdToUse) {
       return 0;
     } else {
-      double blowup =getBlowupRatio();
       long inputSize = job.getInputLength() + job.desiredMaps(); 
       //add desiredMaps() so that randomwriter case doesn't blow up
-      long estimate = Math.round(inputSize * blowup * 2.0);
-  
-      LOG.debug("estimate total map output will be " + estimate +
-          " bytes. (blowup = 2*" + blowup + ")");
+      long estimate = Math.round((inputSize * 
+          completedMapsOutputSize * 2.0)/completedMapsInputSize);
+      LOG.debug("estimate total map output will be " + estimate);
       return estimate;
     }
   }

Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=748035&r1=748034&r2=748035&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java Thu Feb 26 06:30:43 2009
@@ -36,7 +36,7 @@
     //unfortunately, we can't set job input size from here.
     ResourceEstimator re = new ResourceEstimator(jip);
     
-    for(int i = 0; i < maps / 10 -1; ++i) {
+    for(int i = 0; i < maps / 10 ; ++i) {
 
       long estOutSize = re.getEstimatedMapOutputSize();
       System.out.println(estOutSize);
@@ -49,10 +49,56 @@
       TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
       re.updateWithCompletedTask(ts, tip);
     }
+    assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize());
+    assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize());
+    
+  }
+  
+  public void testWithNonZeroInput() throws Exception {
+    final int maps = 100;
+    final int reduces = 2;
+    final int singleMapOutputSize = 1000;
+    final int singleMapInputSize = 500;
+    JobConf jc = new JobConf();
+    JobID jid = new JobID("testJT", 0);
+    jc.setNumMapTasks(maps);
+    jc.setNumReduceTasks(reduces);
+    
+    JobInProgress jip = new JobInProgress(jid, jc) {
+      long getInputLength() {
+        return singleMapInputSize*desiredMaps();
+      }
+    };
+    ResourceEstimator re = new ResourceEstimator(jip);
+    
+    for(int i = 0; i < maps / 10 ; ++i) {
+
+      long estOutSize = re.getEstimatedMapOutputSize();
+      System.out.println(estOutSize);
+      assertEquals(0, estOutSize);
+      
+      TaskStatus ts = new MapTaskStatus();
+      ts.setOutputSize(singleMapOutputSize);
+      RawSplit split = new RawSplit();
+      split.setDataLength(singleMapInputSize);
+      TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
+      re.updateWithCompletedTask(ts, tip);
+    }
     
     assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize());
     assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize());
+
+    //add one more map task with input size as 0
+    TaskStatus ts = new MapTaskStatus();
+    ts.setOutputSize(singleMapOutputSize);
+    RawSplit split = new RawSplit();
+    split.setDataLength(0);
+    TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
+    re.updateWithCompletedTask(ts, tip);
     
+    long expectedTotalMapOutSize = (singleMapOutputSize*11) * 
+      ((maps*singleMapInputSize)+maps)/((singleMapInputSize+1)*10+1);
+    assertEquals(2* expectedTotalMapOutSize/maps, re.getEstimatedMapOutputSize());
   }
 
 }