You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/23 11:01:03 UTC
svn commit: r746945 - in /hadoop/core/branches/branch-0.20: ./ CHANGES.txt
src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
Author: ddas
Date: Mon Feb 23 10:01:02 2009
New Revision: 746945
URL: http://svn.apache.org/viewvc?rev=746945&view=rev
Log:
Merge -r 746943:746944 from trunk onto 0.20 branch. Fixes HADOOP-5241.
Modified:
hadoop/core/branches/branch-0.20/ (props changed)
hadoop/core/branches/branch-0.20/CHANGES.txt (contents, props changed)
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 23 10:01:02 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746902-746903
+/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746902-746903,746944
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=746945&r1=746944&r2=746945&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Mon Feb 23 10:01:02 2009
@@ -645,6 +645,9 @@
references to completedJobStore outside the block where the JobTracker is locked.
(ddas)
+ HADOOP-5241. Fixes a bug in disk-space resource estimation. Makes the estimation
+ formula linear where blowUp = Total-Output/Total-Input. (Sharad Agarwal via ddas)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 23 10:01:02 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
/hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746944
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java?rev=746945&r1=746944&r2=746945&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java Mon Feb 23 10:01:02 2009
@@ -34,82 +34,46 @@
private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.mapred.ResourceEstimator");
+ private long completedMapsInputSize;
+ private long completedMapsOutputSize;
- /**
- * Estimated ratio of output to (input size+1) for map tasks.
- */
- private double mapBlowupRatio;
-
- /**
- * How much relative weight to put on the current estimate.
- * Each completed map has unit weight.
- */
- private double estimateWeight;
+ private int completedMapsUpdates;
final private JobInProgress job;
final private int threshholdToUse;
public ResourceEstimator(JobInProgress job) {
this.job = job;
threshholdToUse = job.desiredMaps()/ 10;
- mapBlowupRatio = 0;
- estimateWeight = 1;
}
+ protected synchronized void updateWithCompletedTask(TaskStatus ts,
+ TaskInProgress tip) {
- /**
- * Have private access methods to abstract away synchro.
- * @return
- */
- private synchronized double getBlowupRatio() {
- return mapBlowupRatio;
- }
-
- private synchronized void setBlowupRatio(double b) {
- mapBlowupRatio = b;
- }
-
- void updateWithCompletedTask(TaskStatus ts, TaskInProgress tip) {
-
//-1 indicates error, which we don't average in.
if(tip.isMapTask() && ts.getOutputSize() != -1) {
- double blowupOnThisTask = ts.getOutputSize() /
- ((double) tip.getMapInputSize() + 1);
-
- LOG.info("measured blowup on " + tip.getTIPId() + " was " +
- ts.getOutputSize() + "/" +(tip.getMapInputSize()+1) + " = "
- + blowupOnThisTask);
-
- double newEstimate;
- synchronized(this) {
- newEstimate = blowupOnThisTask / estimateWeight +
- ((estimateWeight - 1) / estimateWeight) * getBlowupRatio();
- estimateWeight++;
- }
- setBlowupRatio(newEstimate);
-
- LOG.info("new estimate is blowup = " + newEstimate);
+ completedMapsUpdates++;
+
+ completedMapsInputSize+=(tip.getMapInputSize()+1);
+ completedMapsOutputSize+=ts.getOutputSize();
+
+ LOG.info("completedMapsUpdates:"+completedMapsUpdates+" "+
+ "completedMapsInputSize:"+completedMapsInputSize+" " +
+ "completedMapsOutputSize:"+completedMapsOutputSize);
}
}
/**
* @return estimated length of this job's total map output
*/
- protected long getEstimatedTotalMapOutputSize() {
- double estWeight;
- synchronized(this) {
- estWeight = this.estimateWeight;
- }
-
- if(estWeight < threshholdToUse) {
+ protected synchronized long getEstimatedTotalMapOutputSize() {
+ if(completedMapsUpdates < threshholdToUse) {
return 0;
} else {
- double blowup =getBlowupRatio();
long inputSize = job.getInputLength() + job.desiredMaps();
//add desiredMaps() so that randomwriter case doesn't blow up
- long estimate = Math.round(inputSize * blowup * 2.0);
-
- LOG.debug("estimate total map output will be " + estimate +
- " bytes. (blowup = 2*" + blowup + ")");
+ long estimate = Math.round((inputSize *
+ completedMapsOutputSize * 2.0)/completedMapsInputSize);
+ LOG.debug("estimate total map output will be " + estimate);
return estimate;
}
}
Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=746945&r1=746944&r2=746945&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java Mon Feb 23 10:01:02 2009
@@ -36,7 +36,7 @@
//unfortunately, we can't set job input size from here.
ResourceEstimator re = new ResourceEstimator(jip);
- for(int i = 0; i < maps / 10 -1; ++i) {
+ for(int i = 0; i < maps / 10 ; ++i) {
long estOutSize = re.getEstimatedMapOutputSize();
System.out.println(estOutSize);
@@ -49,10 +49,56 @@
TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
re.updateWithCompletedTask(ts, tip);
}
+ assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize());
+ assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize());
+
+ }
+
+ public void testWithNonZeroInput() throws Exception {
+ final int maps = 100;
+ final int reduces = 2;
+ final int singleMapOutputSize = 1000;
+ final int singleMapInputSize = 500;
+ JobConf jc = new JobConf();
+ JobID jid = new JobID("testJT", 0);
+ jc.setNumMapTasks(maps);
+ jc.setNumReduceTasks(reduces);
+
+ JobInProgress jip = new JobInProgress(jid, jc) {
+ long getInputLength() {
+ return singleMapInputSize*desiredMaps();
+ }
+ };
+ ResourceEstimator re = new ResourceEstimator(jip);
+
+ for(int i = 0; i < maps / 10 ; ++i) {
+
+ long estOutSize = re.getEstimatedMapOutputSize();
+ System.out.println(estOutSize);
+ assertEquals(0, estOutSize);
+
+ TaskStatus ts = new MapTaskStatus();
+ ts.setOutputSize(singleMapOutputSize);
+ RawSplit split = new RawSplit();
+ split.setDataLength(singleMapInputSize);
+ TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
+ re.updateWithCompletedTask(ts, tip);
+ }
assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize());
assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize());
+
+ //add one more map task with input size as 0
+ TaskStatus ts = new MapTaskStatus();
+ ts.setOutputSize(singleMapOutputSize);
+ RawSplit split = new RawSplit();
+ split.setDataLength(0);
+ TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
+ re.updateWithCompletedTask(ts, tip);
+ long expectedTotalMapOutSize = (singleMapOutputSize*11) *
+ ((maps*singleMapInputSize)+maps)/((singleMapInputSize+1)*10+1);
+ assertEquals(2* expectedTotalMapOutSize/maps, re.getEstimatedMapOutputSize());
}
}