You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:40:52 UTC
svn commit: r1077655 - in
/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred:
CapacitySchedulerConf.java CapacitySchedulerQueue.java
CapacityTaskScheduler.java
Author: omalley
Date: Fri Mar 4 04:40:52 2011
New Revision: 1077655
URL: http://svn.apache.org/viewvc?rev=1077655&view=rev
Log:
commit 6207d0c9e9039448a367080533398e2edf3dbc69
Author: Arun C Murthy <ac...@apache.org>
Date: Thu Aug 19 10:50:37 2010 -0700
MAPREDUCE-1872. Fixed a corner case where user limits were unfairly applied, added a knob to limit number of tasks per heartbeat.
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=1077655&r1=1077654&r2=1077655&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Mar 4 04:40:52 2011
@@ -501,4 +501,24 @@ class CapacitySchedulerConf {
rmConf.setInt(
"mapred.capacity-scheduler.init-worker-threads", poolSize);
}
+
+ /**
+ * Get the maximum number of tasks which can be scheduled in a heartbeat.
+ * @return the maximum number of tasks which can be scheduled in a heartbeat
+ */
+ public int getMaxTasksPerHeartbeat() {
+ return rmConf.getInt(
+ "mapred.capacity-scheduler.maximum-tasks-per-heartbeat",
+ Integer.MAX_VALUE);
+ }
+
+ /**
+ * Set the maximum number of tasks which can be scheduled in a heartbeat
+ * @param maxTasksPerHeartbeat the maximum number of tasks which can be
+ * scheduled in a heartbeat
+ */
+ public void setMaxTasksPerHeartbeat(int maxTasksPerHeartbeat) {
+ rmConf.setInt("mapred.capacity-scheduler.maximum-tasks-per-heartbeat",
+ maxTasksPerHeartbeat);
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java?rev=1077655&r1=1077654&r2=1077655&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java Fri Mar 4 04:40:52 2011
@@ -22,8 +22,10 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
@@ -69,6 +71,9 @@ class CapacitySchedulerQueue {
//in cluster at any given time.
private int maxCapacity = -1;
+ // Active users
+ Set<String> users = new HashSet<String>();
+
/**
* for each user, we need to keep track of number of slots occupied by
* running tasks
@@ -82,6 +87,7 @@ class CapacitySchedulerQueue {
void reset() {
numRunningTasks = 0;
numSlotsOccupied = 0;
+ users.clear();
numSlotsOccupiedByUser.clear();
}
@@ -120,6 +126,13 @@ class CapacitySchedulerQueue {
}
/**
+ * @return number of active users
+ */
+ int getNumActiveUsers() {
+ return users.size();
+ }
+
+ /**
* return information about the tasks
*/
@Override
@@ -184,12 +197,15 @@ class CapacitySchedulerQueue {
}
}
- void updateSlotsUsage(String user, int numRunningTasks, int numSlotsOccupied) {
+ void updateSlotsUsage(String user, int pendingTasks, int numRunningTasks, int numSlotsOccupied) {
this.numRunningTasks += numRunningTasks;
this.numSlotsOccupied += numSlotsOccupied;
Integer i = this.numSlotsOccupiedByUser.get(user);
int slots = numSlotsOccupied + ((i == null) ? 0 : i.intValue());
this.numSlotsOccupiedByUser.put(user, slots);
+ if (pendingTasks > 0) {
+ users.add(user);
+ }
}
}
@@ -489,6 +505,16 @@ class CapacitySchedulerQueue {
throw new IllegalArgumentException("Illegal taskType=" + taskType);
}
+ int getNumActiveUsersByTaskType(TaskType taskType) {
+ if (taskType == TaskType.MAP) {
+ return mapSlots.getNumActiveUsers();
+ } else if (taskType == TaskType.REDUCE) {
+ return reduceSlots.getNumActiveUsers();
+ }
+
+ throw new IllegalArgumentException("Illegal taskType=" + taskType);
+ }
+
/**
* A new job is added to the
* @param job
@@ -553,12 +579,14 @@ class CapacitySchedulerQueue {
* @param numRunningTasks
* @param numSlotsOccupied
*/
- void update(TaskType type, String user,
+ void update(TaskType type, JobInProgress job, String user,
int numRunningTasks, int numSlotsOccupied) {
if (type == TaskType.MAP) {
- mapSlots.updateSlotsUsage(user, numRunningTasks, numSlotsOccupied);
+ mapSlots.updateSlotsUsage(user, job.pendingMaps(),
+ numRunningTasks, numSlotsOccupied);
} else if (type == TaskType.REDUCE) {
- reduceSlots.updateSlotsUsage(user, numRunningTasks, numSlotsOccupied);
+ reduceSlots.updateSlotsUsage(user, job.pendingReduces(),
+ numRunningTasks, numSlotsOccupied);
}
}
@@ -611,9 +639,9 @@ class CapacitySchedulerQueue {
numRunningReduceSlots,
numReservedReduceSlotsForThisJob));
- update(TaskType.MAP, j.getProfile().getUser(),
+ update(TaskType.MAP, j, j.getProfile().getUser(),
numMapsRunningForThisJob, numMapSlotsForThisJob);
- update(TaskType.REDUCE, j.getProfile().getUser(),
+ update(TaskType.REDUCE, j, j.getProfile().getUser(),
numReducesRunningForThisJob, numReduceSlotsForThisJob);
if (LOG.isDebugEnabled()) {
@@ -1127,9 +1155,13 @@ class CapacitySchedulerQueue {
// queue's configured capacity * user-limit-factor.
// Also, the queue's configured capacity should be higher than
// queue-hard-limit * ulMin
+
+ // All users in this queue might not need any slots of type 'taskType'
+ int activeUsers = Math.max(1, getNumActiveUsersByTaskType(taskType));
+
int limit =
Math.min(
- Math.max(divideAndCeil(currentCapacity, numJobsByUser.size()),
+ Math.max(divideAndCeil(currentCapacity, activeUsers),
divideAndCeil(ulMin*currentCapacity, 100)),
(int)(queueCapacity * ulMinFactor)
);
@@ -1138,9 +1170,11 @@ class CapacitySchedulerQueue {
limit) {
if (LOG.isDebugEnabled()) {
LOG.debug("User " + user + " is over limit for queue=" + queueName +
+ " queueCapacity=" + queueCapacity +
" num slots occupied=" + getNumSlotsOccupiedByUser(user, taskType) +
" limit=" + limit +" numSlotsRequested=" + numSlotsRequested +
- " currentCapacity=" + currentCapacity);
+ " currentCapacity=" + currentCapacity +
+ " numActiveUsers=" + getNumActiveUsersByTaskType(taskType));
}
return false;
}
@@ -1303,4 +1337,4 @@ class CapacitySchedulerQueue {
return true;
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077655&r1=1077654&r2=1077655&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar 4 04:40:52 2011
@@ -652,6 +652,8 @@ class CapacityTaskScheduler extends Task
private long memSizeForReduceSlotOnJT;
private long limitMaxMemForMapTasks;
private long limitMaxMemForReduceTasks;
+
+ private volatile int maxTasksPerHeartbeat;
public CapacityTaskScheduler() {
this(new Clock());
@@ -877,6 +879,8 @@ class CapacityTaskScheduler extends Task
// let our mgr objects know about the queues
mapScheduler.initialize(queueInfoMap);
reduceScheduler.initialize(queueInfoMap);
+
+ maxTasksPerHeartbeat = schedConf.getMaxTasksPerHeartbeat();
}
Map<String, CapacitySchedulerQueue>
@@ -1077,6 +1081,10 @@ class CapacityTaskScheduler extends Task
tasks.add(t);
+ if (tasks.size() >= maxTasksPerHeartbeat) {
+ return;
+ }
+
if (TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND ==
tlr.getLookUpStatus()) {
// Atmost 1 off-switch task per-heartbeat
@@ -1089,7 +1097,7 @@ class CapacityTaskScheduler extends Task
// Update the queue
CapacitySchedulerQueue queue =
queueInfoMap.get(job.getProfile().getQueueName());
- queue.update(TaskType.MAP,
+ queue.update(TaskType.MAP, job,
job.getProfile().getUser(), 1, t.getNumSlotsRequired());
}
}