You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/23 11:01:03 UTC

svn commit: r746945 - in /hadoop/core/branches/branch-0.20: ./ CHANGES.txt src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java src/test/org/apache/hadoop/mapred/TestResourceEstimation.java

Author: ddas
Date: Mon Feb 23 10:01:02 2009
New Revision: 746945

URL: http://svn.apache.org/viewvc?rev=746945&view=rev
Log:
Merge -r 746943:746944 from trunk onto 0.20 branch. Fixes HADOOP-5241.

Modified:
    hadoop/core/branches/branch-0.20/   (props changed)
    hadoop/core/branches/branch-0.20/CHANGES.txt   (contents, props changed)
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
    hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java

Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 23 10:01:02 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746902-746903
+/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746902-746903,746944

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=746945&r1=746944&r2=746945&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Mon Feb 23 10:01:02 2009
@@ -645,6 +645,9 @@
     references to completedJobStore outside the block where the JobTracker is locked.
     (ddas)
 
+    HADOOP-5241. Fixes a bug in disk-space resource estimation. Makes the estimation
+    formula linear where blowUp = Total-Output/Total-Input. (Sharad Agarwal via ddas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 23 10:01:02 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746944

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java?rev=746945&r1=746944&r2=746945&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java Mon Feb 23 10:01:02 2009
@@ -34,82 +34,46 @@
   private static final Log LOG = LogFactory.getLog(
       "org.apache.hadoop.mapred.ResourceEstimator");
 
+  private long completedMapsInputSize;
+  private long completedMapsOutputSize;
 
-  /**
-   * Estimated ratio of output to (input size+1) for map tasks. 
-   */
-  private double mapBlowupRatio;
-  
-  /**
-   * How much relative weight to put on the current estimate.
-   * Each completed map has unit weight.
-   */
-  private double estimateWeight;
+  private int completedMapsUpdates;
   final private JobInProgress job;
   final private int threshholdToUse;
 
   public ResourceEstimator(JobInProgress job) {
     this.job = job;
     threshholdToUse = job.desiredMaps()/ 10;
-    mapBlowupRatio = 0;
-    estimateWeight = 1;
   }
 
+  protected synchronized void updateWithCompletedTask(TaskStatus ts, 
+      TaskInProgress tip) {
 
-  /**
-   * Have private access methods to abstract away synchro.
-   * @return
-   */
-  private synchronized double getBlowupRatio() {
-    return mapBlowupRatio;
-  }
-
-  private synchronized void setBlowupRatio(double b)  {
-    mapBlowupRatio = b;
-  }
-
-  void updateWithCompletedTask(TaskStatus ts, TaskInProgress tip) {
-    
     //-1 indicates error, which we don't average in.
     if(tip.isMapTask() &&  ts.getOutputSize() != -1)  {
-      double blowupOnThisTask = ts.getOutputSize() / 
-        ((double) tip.getMapInputSize() + 1);
-      
-      LOG.info("measured blowup on " + tip.getTIPId() + " was " +
-          ts.getOutputSize() + "/" +(tip.getMapInputSize()+1) + " = " 
-          + blowupOnThisTask);
-      
-      double newEstimate;
-      synchronized(this) {
-        newEstimate = blowupOnThisTask / estimateWeight + 
-            ((estimateWeight - 1) / estimateWeight) * getBlowupRatio();
-        estimateWeight++; 
-      }
-      setBlowupRatio(newEstimate);
-      
-      LOG.info("new estimate is blowup = " + newEstimate);
+      completedMapsUpdates++;
+
+      completedMapsInputSize+=(tip.getMapInputSize()+1);
+      completedMapsOutputSize+=ts.getOutputSize();
+
+      LOG.info("completedMapsUpdates:"+completedMapsUpdates+"  "+
+          "completedMapsInputSize:"+completedMapsInputSize+"  " +
+        "completedMapsOutputSize:"+completedMapsOutputSize);
     }
   }
 
   /**
    * @return estimated length of this job's total map output
    */
-  protected long getEstimatedTotalMapOutputSize()  {
-    double estWeight;
-    synchronized(this) {
-      estWeight = this.estimateWeight;
-    }
-    
-    if(estWeight < threshholdToUse) {
+  protected synchronized long getEstimatedTotalMapOutputSize()  {
+    if(completedMapsUpdates < threshholdToUse) {
       return 0;
     } else {
-      double blowup =getBlowupRatio();
       long inputSize = job.getInputLength() + job.desiredMaps(); 
       //add desiredMaps() so that randomwriter case doesn't blow up
-      long estimate = Math.round(inputSize * blowup * 2.0);
-  
-      LOG.debug("estimate total map output will be " + estimate +
-          " bytes. (blowup = 2*" + blowup + ")");
+      long estimate = Math.round((inputSize * 
+          completedMapsOutputSize * 2.0)/completedMapsInputSize);
+      LOG.debug("estimate total map output will be " + estimate);
       return estimate;
     }
   }

Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=746945&r1=746944&r2=746945&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java Mon Feb 23 10:01:02 2009
@@ -36,7 +36,7 @@
     //unfortunately, we can't set job input size from here.
     ResourceEstimator re = new ResourceEstimator(jip);
     
-    for(int i = 0; i < maps / 10 -1; ++i) {
+    for(int i = 0; i < maps / 10 ; ++i) {
 
       long estOutSize = re.getEstimatedMapOutputSize();
       System.out.println(estOutSize);
@@ -49,10 +49,56 @@
       TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
       re.updateWithCompletedTask(ts, tip);
     }
+    assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize());
+    assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize());
+    
+  }
+  
+  public void testWithNonZeroInput() throws Exception {
+    final int maps = 100;
+    final int reduces = 2;
+    final int singleMapOutputSize = 1000;
+    final int singleMapInputSize = 500;
+    JobConf jc = new JobConf();
+    JobID jid = new JobID("testJT", 0);
+    jc.setNumMapTasks(maps);
+    jc.setNumReduceTasks(reduces);
+    
+    JobInProgress jip = new JobInProgress(jid, jc) {
+      long getInputLength() {
+        return singleMapInputSize*desiredMaps();
+      }
+    };
+    ResourceEstimator re = new ResourceEstimator(jip);
+    
+    for(int i = 0; i < maps / 10 ; ++i) {
+
+      long estOutSize = re.getEstimatedMapOutputSize();
+      System.out.println(estOutSize);
+      assertEquals(0, estOutSize);
+      
+      TaskStatus ts = new MapTaskStatus();
+      ts.setOutputSize(singleMapOutputSize);
+      RawSplit split = new RawSplit();
+      split.setDataLength(singleMapInputSize);
+      TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
+      re.updateWithCompletedTask(ts, tip);
+    }
     
     assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize());
     assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize());
+
+    //add one more map task with input size as 0
+    TaskStatus ts = new MapTaskStatus();
+    ts.setOutputSize(singleMapOutputSize);
+    RawSplit split = new RawSplit();
+    split.setDataLength(0);
+    TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
+    re.updateWithCompletedTask(ts, tip);
     
+    long expectedTotalMapOutSize = (singleMapOutputSize*11) * 
+      ((maps*singleMapInputSize)+maps)/((singleMapInputSize+1)*10+1);
+    assertEquals(2* expectedTotalMapOutSize/maps, re.getEstimatedMapOutputSize());
   }
 
 }