You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:48:14 UTC
svn commit: r1077722 - in
/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred:
CapacitySchedulerConf.java CapacityTaskScheduler.java
Author: omalley
Date: Fri Mar 4 04:48:13 2011
New Revision: 1077722
URL: http://svn.apache.org/viewvc?rev=1077722&view=rev
Log:
commit cefe153b23be6d76c1b7803bb0af0290e23f93aa
Author: Arun C Murthy <ac...@apache.org>
Date: Tue Sep 21 12:11:04 2010 -0700
Added a knob to control off-switch scheduling.
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=1077722&r1=1077721&r2=1077722&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Mar 4 04:48:13 2011
@@ -109,6 +109,8 @@ class CapacitySchedulerConf {
static final int DEFAULT_MAX_SYSTEM_JOBS = 5000;
+ static final int DEFAULT_MAX_TASKS_TO_SCHEDULE_AFTER_OFFSWITCH = 0;
+
/**
* Create a new Capacity scheduler conf.
* This method reads from the default configuration file mentioned in
@@ -509,7 +511,7 @@ class CapacitySchedulerConf {
public int getMaxTasksPerHeartbeat() {
return rmConf.getInt(
"mapred.capacity-scheduler.maximum-tasks-per-heartbeat",
- Integer.MAX_VALUE);
+ Short.MAX_VALUE);
}
/**
@@ -521,4 +523,33 @@ class CapacitySchedulerConf {
rmConf.setInt("mapred.capacity-scheduler.maximum-tasks-per-heartbeat",
maxTasksPerHeartbeat);
}
+
+ /**
+ * Get the maximum number of tasks to schedule, per heartbeat, after an
+ * off-switch task has been assigned.
+ *
+ * @return the maximum number of tasks to schedule, per heartbeat, after an
+ * off-switch task has been assigned
+ */
+ public int getMaxTasksToAssignAfterOffSwitch() {
+ return rmConf.getInt(
+ "mapred.capacity-scheduler.maximum-tasks-after-offswitch",
+ DEFAULT_MAX_TASKS_TO_SCHEDULE_AFTER_OFFSWITCH);
+ }
+
+ /**
+ * Set the maximum number of tasks to schedule, per heartbeat, after an
+ * off-switch task has been assigned.
+ *
+ * @param maxTasksToAssignAfterOffSwitch the maximum number of tasks to
+ * schedule, per heartbeat, after an
+ * off-switch task has been assigned
+ */
+ public void setMaxTasksToAssignAfterOffSwitch(
+ int maxTasksToAssignAfterOffSwitch) {
+ rmConf.setInt(
+ "mapred.capacity-scheduler.maximum-tasks-after-offswitch",
+ Integer.MAX_VALUE);
+
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077722&r1=1077721&r2=1077722&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar 4 04:48:13 2011
@@ -654,6 +654,7 @@ class CapacityTaskScheduler extends Task
private long limitMaxMemForReduceTasks;
private volatile int maxTasksPerHeartbeat;
+ private volatile int maxTasksToAssignAfterOffSwitch;
public CapacityTaskScheduler() {
this(new Clock());
@@ -880,7 +881,10 @@ class CapacityTaskScheduler extends Task
mapScheduler.initialize(queueInfoMap);
reduceScheduler.initialize(queueInfoMap);
+ // scheduling tunables
maxTasksPerHeartbeat = schedConf.getMaxTasksPerHeartbeat();
+ maxTasksToAssignAfterOffSwitch =
+ schedConf.getMaxTasksToAssignAfterOffSwitch();
}
Map<String, CapacitySchedulerQueue>
@@ -1064,6 +1068,7 @@ class CapacityTaskScheduler extends Task
throws IOException {
int availableSlots = maxMapSlots - currentMapSlots;
boolean assignOffSwitch = true;
+ int tasksToAssignAfterOffSwitch = this.maxTasksToAssignAfterOffSwitch;
while (availableSlots > 0) {
mapScheduler.sortQueues();
TaskLookupResult tlr = mapScheduler.assignTasks(taskTracker,
@@ -1091,6 +1096,19 @@ class CapacityTaskScheduler extends Task
assignOffSwitch = false;
}
+ // Respect limits on #tasks to assign after an off-switch task is assigned
+ if (!assignOffSwitch) {
+ if (tasksToAssignAfterOffSwitch == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Hit limit of max tasks after off-switch: " +
+ this.maxTasksToAssignAfterOffSwitch + " after " +
+ tasks.size() + " maps.");
+ }
+ return;
+ }
+ --tasksToAssignAfterOffSwitch;
+ }
+
// Assigned some slots
availableSlots -= t.getNumSlotsRequired();