You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/08/27 08:39:23 UTC

svn commit: r689380 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java src/test/org/apache/hadoop/mapred/TestResourceEstimation.java

Author: omalley
Date: Tue Aug 26 23:39:22 2008
New Revision: 689380

URL: http://svn.apache.org/viewvc?rev=689380&view=rev
Log:
HADOOP-3961. Fix task disk space requirement estimates for virtual
input jobs. Delays limiting task placement until after 10% of the maps
have finished. (Ari Rabkin via omalley)

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=689380&r1=689379&r2=689380&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Aug 26 23:39:22 2008
@@ -388,6 +388,10 @@
     HADOOP-4030. Remove lzop from the default list of codecs. (Arun Murthy via
     cdouglas)
 
+    HADOOP-3961. Fix task disk space requirement estimates for virtual
+    input jobs. Delays limiting task placement until after 10% of the maps
+    have finished. (Ari Rabkin via omalley)
+
 Release 0.18.1 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java?rev=689380&r1=689379&r2=689380&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java Tue Aug 26 23:39:22 2008
@@ -37,23 +37,23 @@
 
 
   /**
-   * Estimated ratio of output to input size for map tasks. 
+   * Estimated ratio of output to (input size+1) for map tasks. 
    */
   private double mapBlowupRatio;
+  
+  /**
+   * How much relative weight to put on the current estimate.
+   * Each completed map has unit weight.
+   */
   private double estimateWeight;
-  private JobInProgress job;
-
-  //guess a factor of two blowup due to temp space for merge
-  public static final double INITIAL_BLOWUP_GUESS = 1; 
-
-  //initial estimate is weighted as much as this fraction of the real datapoints
-  static final double INITIAL_EST_WEIGHT_PERCENT = 0.05; 
-
+  final private JobInProgress job;
+  final private int threshholdToUse;
 
   public ResourceEstimator(JobInProgress job) {
-    mapBlowupRatio = INITIAL_BLOWUP_GUESS;
     this.job = job;
-    estimateWeight = INITIAL_EST_WEIGHT_PERCENT * job.desiredMaps();
+    threshholdToUse = job.desiredMaps()/ 10;
+    mapBlowupRatio = 0;
+    estimateWeight = 1;
   }
 
 
@@ -69,45 +69,71 @@
     mapBlowupRatio = b;
   }
 
-
-
-  public void updateWithCompletedTask(TaskStatus ts, TaskInProgress tip) {
-
+  void updateWithCompletedTask(TaskStatus ts, TaskInProgress tip) {
+    
     //-1 indicates error, which we don't average in.
     if(tip.isMapTask() &&  ts.getOutputSize() != -1)  {
       double blowupOnThisTask = ts.getOutputSize() / 
-        (double) tip.getMapInputSize();
+        ((double) tip.getMapInputSize() + 1);
       
       LOG.info("measured blowup on " + tip.getTIPId() + " was " +
-          ts.getOutputSize() + "/" +tip.getMapInputSize() + " = " 
+          ts.getOutputSize() + "/" +(tip.getMapInputSize()+1) + " = " 
           + blowupOnThisTask);
       
-      double newEstimate = blowupOnThisTask / estimateWeight + 
-          ((estimateWeight - 1) / estimateWeight) * getBlowupRatio();
-      estimateWeight++; 
+      double newEstimate;
+      synchronized(this) {
+        newEstimate = blowupOnThisTask / estimateWeight + 
+            ((estimateWeight - 1) / estimateWeight) * getBlowupRatio();
+        estimateWeight++; 
+      }
       setBlowupRatio(newEstimate);
+      
+      LOG.info("new estimate is blowup = " + newEstimate);
     }
   }
 
   /**
-   * 
+   * @return estimated length of this job's total map output
+   */
+  protected long getEstimatedTotalMapOutputSize()  {
+    double estWeight;
+    synchronized(this) {
+      estWeight = this.estimateWeight;
+    }
+    
+    if(estWeight < threshholdToUse) {
+      return 0;
+    } else {
+      double blowup =getBlowupRatio();
+      long inputSize = job.getInputLength() + job.desiredMaps(); 
+      //add desiredMaps() so that randomwriter case doesn't blow up
+      long estimate = Math.round(inputSize * blowup * 2.0);
+  
+      LOG.debug("estimate total map output will be " + estimate +
+          " bytes. (blowup = 2*" + blowup + ")");
+      return estimate;
+    }
+  }
+  
+  /**
    * @return estimated length of this job's average map output
-   * @throws IOException if the split's getLength() does.
    */
-  public long getEstimatedMapOutputSize()  {
-    double blowup =getBlowupRatio();
-    long estimate =  
-      (long) (job.getInputLength() * blowup / job.desiredMaps() * 2.0);
-    LOG.info("estimate map will take " + estimate +
-        " bytes. (blowup = 2*" + blowup + ")");
+  long getEstimatedMapOutputSize() {
+    long estimate = getEstimatedTotalMapOutputSize()  / job.desiredMaps();
     return estimate;
   }
 
-
-  //estimate that each reduce gets an equal share of total map output
-  public long getEstimatedReduceInputSize() {
-    return 
-       getEstimatedMapOutputSize() * job.desiredMaps() / job.desiredReduces();
+  /**
+   * 
+   * @return estimated length of this job's average reduce input
+   */
+  long getEstimatedReduceInputSize() {
+    if(job.desiredReduces() == 0) {//no reduce output, so no size
+      return 0;
+    } else {
+      return getEstimatedTotalMapOutputSize() / job.desiredReduces();
+      //estimate that each reduce gets an equal share of total map output
+    }
   }
   
 

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=689380&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java Tue Aug 26 23:39:22 2008
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.mapred.JobClient.RawSplit;
+
+public class TestResourceEstimation extends TestCase {
+  
+
+  public void testResourceEstimator() throws Exception {
+    final int maps = 100;
+    final int reduces = 2;
+    final int singleMapOutputSize = 1000;
+    JobConf jc = new JobConf();
+    JobID jid = new JobID("testJT", 0);
+    jc.setNumMapTasks(maps);
+    jc.setNumReduceTasks(reduces);
+    
+    JobInProgress jip = new JobInProgress(jid, jc);
+    //unfortunately, we can't set job input size from here.
+    ResourceEstimator re = new ResourceEstimator(jip);
+    
+    for(int i = 0; i < maps / 10 -1; ++i) {
+
+      long estOutSize = re.getEstimatedMapOutputSize();
+      System.out.println(estOutSize);
+      assertEquals(0, estOutSize);
+      
+      TaskStatus ts = new MapTaskStatus();
+      ts.setOutputSize(singleMapOutputSize);
+      RawSplit split = new RawSplit();
+      split.setDataLength(0);
+      TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
+      re.updateWithCompletedTask(ts, tip);
+    }
+    
+    assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize());
+    assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize());
+    
+  }
+
+}