You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:14:04 UTC
svn commit: r1077430 -
/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
Author: omalley
Date: Fri Mar 4 04:14:04 2011
New Revision: 1077430
URL: http://svn.apache.org/viewvc?rev=1077430&view=rev
Log:
commit b0b5aca14b99ded8cd9f2d0d31f555f5a4a73197
Author: Hong Tang <ht...@yahoo-inc.com>
Date: Tue Apr 27 14:52:55 2010 -0700
MAPREDUCE-1687. Stress submission policy does not always stress the cluster. (htang)
From https://issues.apache.org/jira/secure/attachment/12442692/mr-1687-yhadoop-20.1xx-20100423-2.patch.
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1687. Stress submission policy does not always stress the
+ cluster. (htang)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java?rev=1077430&r1=1077429&r2=1077430&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java Fri Mar 4 04:14:04 2011
@@ -36,8 +36,8 @@ import java.util.concurrent.locks.Condit
public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
public static final Log LOG = LogFactory.getLog(StressJobFactory.class);
- private LoadStatus loadStatus = new LoadStatus();
- private final Condition overloaded = this.lock.newCondition();
+ private final LoadStatus loadStatus = new LoadStatus();
+ private final Condition condUnderloaded = this.lock.newCondition();
/**
* The minimum ratio between pending+running map tasks (aka. incomplete map
* tasks) and cluster map slot capacity for us to consider the cluster is
@@ -47,6 +47,27 @@ public class StressJobFactory extends Jo
static final float OVERLOAD_MAPTASK_MAPSLOT_RATIO = 2.0f;
/**
+ * The minimum ratio between pending+running reduce tasks (aka. incomplete
+ * reduce tasks) and cluster reduce slot capacity for us to consider the
+ * cluster is overloaded. For running reduces, we only count them partially.
+ * Namely, a 40% completed reduce is counted as 0.6 reduce tasks in our
+ * calculation.
+ */
+ static final float OVERLOAD_REDUCETASK_REDUCESLOT_RATIO = 2.5f;
+
+ /**
+ * The maximum share of the cluster's mapslot capacity that can be counted
+ * toward a job's incomplete map tasks in overload calculation.
+ */
+ static final float MAX_MAPSLOT_SHARE_PER_JOB=0.1f;
+
+ /**
+ * The maximum share of the cluster's reduceslot capacity that can be counted
+ * toward a job's incomplete reduce tasks in overload calculation.
+ */
+ static final float MAX_REDUCESLOT_SHARE_PER_JOB=0.1f;
+
+ /**
* Creating a new instance does not start the thread.
*
* @param submitter Component to which deserialized jobs are passed
@@ -63,11 +84,6 @@ public class StressJobFactory extends Jo
throws IOException {
super(
submitter, jobProducer, scratch, conf, startFlag, resolver);
-
- //Setting isOverloaded as true , now JF would wait for atleast first
- //set of ClusterStats based on which it can decide how many job it has
- //to submit.
- this.loadStatus.isOverloaded = true;
}
public Thread createReaderThread() {
@@ -104,33 +120,37 @@ public class StressJobFactory extends Jo
while (!Thread.currentThread().isInterrupted()) {
lock.lock();
try {
- while (loadStatus.isOverloaded) {
+ while (loadStatus.overloaded()) {
//Wait while JT is overloaded.
try {
- overloaded.await();
+ condUnderloaded.await();
} catch (InterruptedException ie) {
return;
}
}
- int noOfSlotsAvailable = loadStatus.numSlotsBackfill;
- LOG.info("No of slots to be backfilled are " + noOfSlotsAvailable);
-
- for (int i = 0; i < noOfSlotsAvailable; i++) {
+ while (!loadStatus.overloaded()) {
try {
final JobStory job = getNextJobFiltered();
if (null == job) {
return;
}
- //TODO: We need to take care of scenario when one map takes more
- //than 1 slot.
- i += job.getNumberMaps();
-
+
submitter.add(
jobCreator.createGridmixJob(
conf, 0L, job, scratch, userResolver.getTargetUgi(
UserGroupInformation.createRemoteUser(
job.getUser())), sequence.getAndIncrement()));
+ // TODO: We need to take care of scenario when one map/reduce
+ // takes more than 1 slot.
+ loadStatus.mapSlotsBackfill -=
+ calcEffectiveIncompleteMapTasks(
+ loadStatus.mapSlotCapacity, job.getNumberMaps(), 0.0f);
+ loadStatus.reduceSlotsBackfill -=
+ calcEffectiveIncompleteReduceTasks(
+ loadStatus.reduceSlotCapacity, job.getNumberReduces(),
+ 0.0f);
+ --loadStatus.numJobsBackfill;
} catch (IOException e) {
LOG.error("Error while submitting the job ", e);
error = e;
@@ -166,12 +186,33 @@ public class StressJobFactory extends Jo
} catch (IOException e) {
LOG.error("Couldn't get the new Status",e);
}
- overloaded.signalAll();
+ if (!loadStatus.overloaded()) {
+ condUnderloaded.signalAll();
+ }
} finally {
lock.unlock();
}
}
+ static float calcEffectiveIncompleteMapTasks(int mapSlotCapacity,
+ int numMaps, float mapProgress) {
+ float maxEffIncompleteMapTasks = Math.max(1.0f, mapSlotCapacity
+ * MAX_MAPSLOT_SHARE_PER_JOB);
+ float mapProgressAdjusted = Math.max(Math.min(mapProgress, 1.0f), 0.0f);
+ return Math.min(maxEffIncompleteMapTasks, numMaps
+ * (1.0f - mapProgressAdjusted));
+ }
+
+ static float calcEffectiveIncompleteReduceTasks(int reduceSlotCapacity,
+ int numReduces, float reduceProgress) {
+ float maxEffIncompleteReduceTasks = Math.max(1.0f, reduceSlotCapacity
+ * MAX_REDUCESLOT_SHARE_PER_JOB);
+ float reduceProgressAdjusted = Math.max(Math.min(reduceProgress, 1.0f),
+ 0.0f);
+ return Math.min(maxEffIncompleteReduceTasks, numReduces
+ * (1.0f - reduceProgressAdjusted));
+ }
+
/**
* We try to use some light-weight mechanism to determine cluster load.
*
@@ -181,62 +222,98 @@ public class StressJobFactory extends Jo
*/
private void checkLoadAndGetSlotsToBackfill(
ClusterStats stats, ClusterStatus clusterStatus) throws IOException {
- // If there are more jobs than number of task trackers, we assume the
- // cluster is overloaded.
- if (stats.getNumRunningJob() >= clusterStatus.getTaskTrackers()) {
+ loadStatus.mapSlotCapacity = clusterStatus.getMaxMapTasks();
+ loadStatus.reduceSlotCapacity = clusterStatus.getMaxReduceTasks();
+
+
+ loadStatus.numJobsBackfill = clusterStatus.getTaskTrackers()
+ - stats.getNumRunningJob();
+ if (loadStatus.numJobsBackfill <= 0) {
if (LOG.isDebugEnabled()) {
- LOG.debug(
- System.currentTimeMillis() + " Overloaded is " +
- Boolean.TRUE.toString() + " #runningJobs >= taskTrackerCount (" +
- stats.getNumRunningJob() + " >= " +
- clusterStatus.getTaskTrackers() + " )\n");
- }
- loadStatus.isOverloaded = true;
- loadStatus.numSlotsBackfill = 0;
- return;
+ LOG.debug(System.currentTimeMillis() + " Overloaded is "
+ + Boolean.TRUE.toString() + " NumJobsBackfill is "
+ + loadStatus.numJobsBackfill);
+ }
+ return; // stop calculation because we know it is overloaded.
}
float incompleteMapTasks = 0; // include pending & running map tasks.
for (JobStats job : ClusterStats.getRunningJobStats()) {
float mapProgress = job.getJob().mapProgress();
int noOfMaps = job.getNoOfMaps();
- incompleteMapTasks += (1 - Math.min(mapProgress,1.0))* noOfMaps;
+ incompleteMapTasks += calcEffectiveIncompleteMapTasks(clusterStatus
+ .getMaxMapTasks(), noOfMaps, mapProgress);
+ }
+ loadStatus.mapSlotsBackfill = (int) (OVERLOAD_MAPTASK_MAPSLOT_RATIO
+ * clusterStatus.getMaxMapTasks() - incompleteMapTasks);
+ if (loadStatus.mapSlotsBackfill <= 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(System.currentTimeMillis() + " Overloaded is "
+ + Boolean.TRUE.toString() + " MapSlotsBackfill is "
+ + loadStatus.mapSlotsBackfill);
+ }
+ return; // stop calculation because we know it is overloaded.
}
- float overloadedThreshold =
- OVERLOAD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMaxMapTasks();
- boolean overloaded = incompleteMapTasks > overloadedThreshold;
- String relOp = (overloaded) ? ">" : "<=";
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- System.currentTimeMillis() + " Overloaded is " + Boolean.toString(
- overloaded) + " incompleteMapTasks " + relOp + " " +
- OVERLOAD_MAPTASK_MAPSLOT_RATIO + "*mapSlotCapacity" + "(" +
- incompleteMapTasks + " " + relOp + " " +
- OVERLOAD_MAPTASK_MAPSLOT_RATIO + "*" +
- clusterStatus.getMaxMapTasks() + ")");
- }
- if (overloaded) {
- loadStatus.isOverloaded = true;
- loadStatus.numSlotsBackfill = 0;
- } else {
- loadStatus.isOverloaded = false;
- loadStatus.numSlotsBackfill =
- (int) (overloadedThreshold - incompleteMapTasks);
+ float incompleteReduceTasks = 0; // include pending & running reduce tasks.
+ for (JobStats job : ClusterStats.getRunningJobStats()) {
+ int noOfReduces = job.getJob().getNumReduceTasks();
+ if (noOfReduces > 0) {
+ float reduceProgress = job.getJob().reduceProgress();
+ incompleteReduceTasks += calcEffectiveIncompleteReduceTasks(
+ clusterStatus.getMaxReduceTasks(), noOfReduces, reduceProgress);
+ }
+ }
+ loadStatus.reduceSlotsBackfill = (int) (OVERLOAD_REDUCETASK_REDUCESLOT_RATIO
+ * clusterStatus.getMaxReduceTasks() - incompleteReduceTasks);
+ if (loadStatus.reduceSlotsBackfill <= 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(System.currentTimeMillis() + " Overloaded is "
+ + Boolean.TRUE.toString() + " ReduceSlotsBackfill is "
+ + loadStatus.reduceSlotsBackfill);
+ }
+ return; // stop calculation because we know it is overloaded.
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Current load Status is " + loadStatus);
+ LOG.debug(System.currentTimeMillis() + " Overloaded is "
+ + Boolean.FALSE.toString() + "Current load Status is " + loadStatus);
}
}
static class LoadStatus {
- boolean isOverloaded = false;
- int numSlotsBackfill = -1;
+ int mapSlotsBackfill;
+ int mapSlotCapacity;
+ int reduceSlotsBackfill;
+ int reduceSlotCapacity;
+ int numJobsBackfill;
+ /**
+ * Construct the LoadStatus in an unknown state - assuming the cluster is
+ * overloaded by setting numSlotsBackfill=0.
+ */
+ LoadStatus() {
+ mapSlotsBackfill = 0;
+ reduceSlotsBackfill = 0;
+ numJobsBackfill = 0;
+
+ mapSlotCapacity = -1;
+ reduceSlotCapacity = -1;
+ }
+
+ public boolean overloaded() {
+ return (mapSlotsBackfill <= 0) || (reduceSlotsBackfill <= 0)
+ || (numJobsBackfill <= 0);
+ }
+
public String toString() {
- return " is Overloaded " + isOverloaded + " no of slots available " +
- numSlotsBackfill;
+ return " Overloaded = " + overloaded()
+ + ", MapSlotBackfill = " + mapSlotsBackfill
+ + ", MapSlotCapacity = " + mapSlotCapacity
+ + ", ReduceSlotBackfill = " + reduceSlotsBackfill
+ + ", ReduceSlotCapacity = " + reduceSlotCapacity
+ + ", NumJobsBackfill = " + numJobsBackfill
+ ;
}
}