You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/02/20 12:12:48 UTC
svn commit: r746206 - in /hadoop/core/trunk: CHANGES.txt
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Author: yhemanth
Date: Fri Feb 20 11:12:48 2009
New Revision: 746206
URL: http://svn.apache.org/viewvc?rev=746206&view=rev
Log:
HADOOP-5214. Fixes a ConcurrentModificationException while the Fairshare Scheduler accesses the tasktrackers stored by the JobTracker. Contributed by Rahul Kumar Singh.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=746206&r1=746205&r2=746206&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Feb 20 11:12:48 2009
@@ -804,6 +804,10 @@
HADOOP-5269. Fixes a problem to do with tasktracker holding on to FAILED_UNCLEAN
or KILLED_UNCLEAN tasks forever. (Amareshwari Sriramadasu via ddas)
+ HADOOP-5214. Fixes a ConcurrentModificationException while the Fairshare
+ Scheduler accesses the tasktrackers stored by the JobTracker.
+ (Rahul Kumar Singh via yhemanth)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=746206&r1=746205&r2=746206&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Fri Feb 20 11:12:48 2009
@@ -233,11 +233,12 @@
runnableReduces += runnableTasks(job, TaskType.REDUCE);
}
+ ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
// Compute total map/reduce slots
// In the future we can precompute this if the Scheduler becomes a
// listener of tracker join/leave events.
- int totalMapSlots = getTotalSlots(TaskType.MAP);
- int totalReduceSlots = getTotalSlots(TaskType.REDUCE);
+ int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus);
+ int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus);
// Scan to see whether any job needs to run a map, then a reduce
ArrayList<Task> tasks = new ArrayList<Task>();
@@ -331,31 +332,36 @@
* fair shares, deficits, minimum slot allocations, and numbers of running
* and needed tasks of each type.
*/
- protected synchronized void update() {
+ protected void update() {
+ //Making more granual locking so that clusterStatus can be fetched from Jobtracker.
+ ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
+ // Got clusterStatus hence acquiring scheduler lock now
// Remove non-running jobs
- List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
- for (JobInProgress job: infos.keySet()) {
- int runState = job.getStatus().getRunState();
- if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
+ synchronized(this){
+ List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
+ for (JobInProgress job: infos.keySet()) {
+ int runState = job.getStatus().getRunState();
+ if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
|| runState == JobStatus.KILLED) {
- toRemove.add(job);
+ toRemove.add(job);
+ }
}
+ for (JobInProgress job: toRemove) {
+ infos.remove(job);
+ poolMgr.removeJob(job);
+ }
+ // Update running jobs with deficits since last update, and compute new
+ // slot allocations, weight, shares and task counts
+ long now = clock.getTime();
+ long timeDelta = now - lastUpdateTime;
+ updateDeficits(timeDelta);
+ updateRunnability();
+ updateTaskCounts();
+ updateWeights();
+ updateMinSlots();
+ updateFairShares(clusterStatus);
+ lastUpdateTime = now;
}
- for (JobInProgress job: toRemove) {
- infos.remove(job);
- poolMgr.removeJob(job);
- }
- // Update running jobs with deficits since last update, and compute new
- // slot allocations, weight, shares and task counts
- long now = clock.getTime();
- long timeDelta = now - lastUpdateTime;
- updateDeficits(timeDelta);
- updateRunnability();
- updateTaskCounts();
- updateWeights();
- updateMinSlots();
- updateFairShares();
- lastUpdateTime = now;
}
private void updateDeficits(long timeDelta) {
@@ -594,7 +600,7 @@
return slotsLeft;
}
- private void updateFairShares() {
+ private void updateFairShares(ClusterStatus clusterStatus) {
// Clear old fairShares
for (JobInfo info: infos.values()) {
info.mapFairShare = 0;
@@ -618,7 +624,7 @@
jobsLeft.add(info);
}
}
- double slotsLeft = getTotalSlots(type);
+ double slotsLeft = getTotalSlots(type, clusterStatus);
while (!jobsLeft.isEmpty()) {
double totalWeight = 0;
for (JobInfo info: jobsLeft) {
@@ -697,13 +703,9 @@
return poolMgr;
}
- public int getTotalSlots(TaskType type) {
- int slots = 0;
- for (TaskTrackerStatus tt: taskTrackerManager.taskTrackers()) {
- slots += (type == TaskType.MAP ?
- tt.getMaxMapTasks() : tt.getMaxReduceTasks());
- }
- return slots;
+ private int getTotalSlots(TaskType type, ClusterStatus clusterStatus) {
+ return (type == TaskType.MAP ?
+ clusterStatus.getMaxMapTasks() : clusterStatus.getMaxReduceTasks());
}
public boolean getUseFifo() {