You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/26 07:30:43 UTC
svn commit: r748035 - in /hadoop/core/branches/branch-0.19: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
Author: ddas
Date: Thu Feb 26 06:30:43 2009
New Revision: 748035
URL: http://svn.apache.org/viewvc?rev=748035&view=rev
Log:
HADOOP-5241. Committing this to the 0.19 branch.
Modified:
hadoop/core/branches/branch-0.19/CHANGES.txt
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=748035&r1=748034&r2=748035&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Thu Feb 26 06:30:43 2009
@@ -13,6 +13,9 @@
HADOOP-5280. Adds a check to prevent a task state transition from FAILED to
any of UNASSIGNED, RUNNING, COMMIT_PENDING or SUCCEEDED. (ddas)
+ HADOOP-5241. Fixes a bug in disk-space resource estimation. Makes the estimation
+ formula linear where blowUp = Total-Output/Total-Input. (Sharad Agarwal via ddas)
+
Release 0.19.1 - 2009-02-23
INCOMPATIBLE CHANGES
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java?rev=748035&r1=748034&r2=748035&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java Thu Feb 26 06:30:43 2009
@@ -34,82 +34,46 @@
private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.mapred.ResourceEstimator");
+ private long completedMapsInputSize;
+ private long completedMapsOutputSize;
- /**
- * Estimated ratio of output to (input size+1) for map tasks.
- */
- private double mapBlowupRatio;
-
- /**
- * How much relative weight to put on the current estimate.
- * Each completed map has unit weight.
- */
- private double estimateWeight;
+ private int completedMapsUpdates;
final private JobInProgress job;
final private int threshholdToUse;
public ResourceEstimator(JobInProgress job) {
this.job = job;
threshholdToUse = job.desiredMaps()/ 10;
- mapBlowupRatio = 0;
- estimateWeight = 1;
}
+ protected synchronized void updateWithCompletedTask(TaskStatus ts,
+ TaskInProgress tip) {
- /**
- * Have private access methods to abstract away synchro.
- * @return
- */
- private synchronized double getBlowupRatio() {
- return mapBlowupRatio;
- }
-
- private synchronized void setBlowupRatio(double b) {
- mapBlowupRatio = b;
- }
-
- void updateWithCompletedTask(TaskStatus ts, TaskInProgress tip) {
-
//-1 indicates error, which we don't average in.
if(tip.isMapTask() && ts.getOutputSize() != -1) {
- double blowupOnThisTask = ts.getOutputSize() /
- ((double) tip.getMapInputSize() + 1);
-
- LOG.info("measured blowup on " + tip.getTIPId() + " was " +
- ts.getOutputSize() + "/" +(tip.getMapInputSize()+1) + " = "
- + blowupOnThisTask);
-
- double newEstimate;
- synchronized(this) {
- newEstimate = blowupOnThisTask / estimateWeight +
- ((estimateWeight - 1) / estimateWeight) * getBlowupRatio();
- estimateWeight++;
- }
- setBlowupRatio(newEstimate);
-
- LOG.info("new estimate is blowup = " + newEstimate);
+ completedMapsUpdates++;
+
+ completedMapsInputSize+=(tip.getMapInputSize()+1);
+ completedMapsOutputSize+=ts.getOutputSize();
+
+ LOG.info("completedMapsUpdates:"+completedMapsUpdates+" "+
+ "completedMapsInputSize:"+completedMapsInputSize+" " +
+ "completedMapsOutputSize:"+completedMapsOutputSize);
}
}
/**
* @return estimated length of this job's total map output
*/
- protected long getEstimatedTotalMapOutputSize() {
- double estWeight;
- synchronized(this) {
- estWeight = this.estimateWeight;
- }
-
- if(estWeight < threshholdToUse) {
+ protected synchronized long getEstimatedTotalMapOutputSize() {
+ if(completedMapsUpdates < threshholdToUse) {
return 0;
} else {
- double blowup =getBlowupRatio();
long inputSize = job.getInputLength() + job.desiredMaps();
//add desiredMaps() so that randomwriter case doesn't blow up
- long estimate = Math.round(inputSize * blowup * 2.0);
-
- LOG.debug("estimate total map output will be " + estimate +
- " bytes. (blowup = 2*" + blowup + ")");
+ long estimate = Math.round((inputSize *
+ completedMapsOutputSize * 2.0)/completedMapsInputSize);
+ LOG.debug("estimate total map output will be " + estimate);
return estimate;
}
}
Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=748035&r1=748034&r2=748035&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java Thu Feb 26 06:30:43 2009
@@ -36,7 +36,7 @@
//unfortunately, we can't set job input size from here.
ResourceEstimator re = new ResourceEstimator(jip);
- for(int i = 0; i < maps / 10 -1; ++i) {
+ for(int i = 0; i < maps / 10 ; ++i) {
long estOutSize = re.getEstimatedMapOutputSize();
System.out.println(estOutSize);
@@ -49,10 +49,56 @@
TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
re.updateWithCompletedTask(ts, tip);
}
+ assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize());
+ assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize());
+
+ }
+
+ public void testWithNonZeroInput() throws Exception {
+ final int maps = 100;
+ final int reduces = 2;
+ final int singleMapOutputSize = 1000;
+ final int singleMapInputSize = 500;
+ JobConf jc = new JobConf();
+ JobID jid = new JobID("testJT", 0);
+ jc.setNumMapTasks(maps);
+ jc.setNumReduceTasks(reduces);
+
+ JobInProgress jip = new JobInProgress(jid, jc) {
+ long getInputLength() {
+ return singleMapInputSize*desiredMaps();
+ }
+ };
+ ResourceEstimator re = new ResourceEstimator(jip);
+
+ for(int i = 0; i < maps / 10 ; ++i) {
+
+ long estOutSize = re.getEstimatedMapOutputSize();
+ System.out.println(estOutSize);
+ assertEquals(0, estOutSize);
+
+ TaskStatus ts = new MapTaskStatus();
+ ts.setOutputSize(singleMapOutputSize);
+ RawSplit split = new RawSplit();
+ split.setDataLength(singleMapInputSize);
+ TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
+ re.updateWithCompletedTask(ts, tip);
+ }
assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize());
assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize());
+
+ //add one more map task with input size as 0
+ TaskStatus ts = new MapTaskStatus();
+ ts.setOutputSize(singleMapOutputSize);
+ RawSplit split = new RawSplit();
+ split.setDataLength(0);
+ TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
+ re.updateWithCompletedTask(ts, tip);
+ long expectedTotalMapOutSize = (singleMapOutputSize*11) *
+ ((maps*singleMapInputSize)+maps)/((singleMapInputSize+1)*10+1);
+ assertEquals(2* expectedTotalMapOutSize/maps, re.getEstimatedMapOutputSize());
}
}