You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/05/26 15:30:38 UTC
svn commit: r778696 [1/2] - in /hadoop/core/trunk: ./ conf/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/ma...
Author: yhemanth
Date: Tue May 26 13:30:37 2009
New Revision: 778696
URL: http://svn.apache.org/viewvc?rev=778696&view=rev
Log:
HADOOP-5881. Simplify memory monitoring and scheduling related configuration. Contributed by Vinod Kumar Vavilapalli.
Added:
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/capacity-scheduler.xml.template
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/core/trunk/src/mapred/mapred-default.xml
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May 26 13:30:37 2009
@@ -686,6 +686,9 @@
HADOOP-5726. Remove pre-emption from capacity scheduler code base.
(Rahul Kumar Singh via yhemanth)
+ HADOOP-5881. Simplify memory monitoring and scheduling related
+ configuration. (Vinod Kumar Vavilapalli via yhemanth)
+
NEW FEATURES
IMPROVEMENTS
Modified: hadoop/core/trunk/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/capacity-scheduler.xml.template?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/conf/capacity-scheduler.xml.template (original)
+++ hadoop/core/trunk/conf/capacity-scheduler.xml.template Tue May 26 13:30:37 2009
@@ -56,34 +56,6 @@
account in scheduling decisions by default in a job queue.
</description>
</property>
-
- <property>
- <name>mapred.capacity-scheduler.task.default-pmem-percentage-in-vmem</name>
- <value>-1</value>
- <description>A percentage (float) of the default VM limit for jobs
- (mapred.task.default.maxvm). This is the default RAM task-limit
- associated with a task. Unless overridden by a job's setting, this
- number defines the RAM task-limit.
-
- If this property is missing, or set to an invalid value, scheduling
- based on physical memory, RAM, is disabled.
- </description>
- </property>
-
- <property>
- <name>mapred.capacity-scheduler.task.limit.maxpmem</name>
- <value>-1</value>
- <description>Configuration that provides an upper limit on the maximum
- physical memory that can be specified by a job. The job configuration
- mapred.task.maxpmem should be less than this value. If not, the job will
- be rejected by the scheduler.
-
- If it is set to -1, scheduler will not consider physical memory for
- scheduling even if virtual memory based scheduling is enabled(by setting
- valid values for both mapred.task.default.maxvmem and
- mapred.task.limit.maxvmem).
- </description>
- </property>
<property>
<name>mapred.capacity-scheduler.default-minimum-user-limit-percent</name>
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Tue May 26 13:30:37 2009
@@ -351,44 +351,4 @@
rmConf.setInt(
"mapred.capacity-scheduler.init-worker-threads", poolSize);
}
-
- /**
- * Get the upper limit on the maximum physical memory that can be specified by
- * a job.
- *
- * @return upper limit for max pmem for tasks.
- */
- public long getLimitMaxPmemForTasks() {
- return rmConf.getLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT);
- }
-
- /**
- * Get the upper limit on the maximum physical memory that can be specified by
- * a job.
- *
- * @param value
- */
- public void setLimitMaxPmemForTasks(long value) {
- rmConf.setLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY, value);
- }
-
- /**
- * Get cluster-wide default percentage of pmem in vmem.
- *
- * @return cluster-wide default percentage of pmem in vmem.
- */
- public float getDefaultPercentOfPmemInVmem() {
- return rmConf.getFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT);
- }
-
- /**
- * Set cluster-wide default percentage of pmem in vmem.
- *
- * @param value
- */
- public void setDefaultPercentOfPmemInVmem(float value) {
- rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value);
- }
}
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue May 26 13:30:37 2009
@@ -278,11 +278,7 @@
/** our TaskScheduler object */
protected CapacityTaskScheduler scheduler;
- // can be replaced with a global type, if we have one
- protected static enum TYPE {
- MAP, REDUCE
- }
- protected TYPE type = null;
+ protected CapacityTaskScheduler.TYPE type = null;
abstract Task obtainNewTask(TaskTrackerStatus taskTracker,
JobInProgress job) throws IOException;
@@ -413,7 +409,8 @@
//If this job meets memory requirements. Ask the JobInProgress for
//a task to be scheduled on the task tracker.
//if we find a job then we pass it on.
- if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+ if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
+ taskTracker)) {
// We found a suitable job. Get task from it.
Task t = obtainNewTask(taskTracker, j);
//if there is a task return it immediately.
@@ -422,6 +419,8 @@
return TaskLookupResult.getTaskFoundResult(t);
} else {
//skip to the next job in the queue.
+ LOG.debug("Job " + j.getJobID().toString()
+ + " returned no tasks of type " + type);
continue;
}
} else {
@@ -456,7 +455,8 @@
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
- if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+ if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
+ taskTracker)) {
// We found a suitable job. Get task from it.
Task t = obtainNewTask(taskTracker, j);
//if there is a task return it immediately.
@@ -561,7 +561,7 @@
private static class MapSchedulingMgr extends TaskSchedulingMgr {
MapSchedulingMgr(CapacityTaskScheduler dad) {
super(dad);
- type = TaskSchedulingMgr.TYPE.MAP;
+ type = CapacityTaskScheduler.TYPE.MAP;
queueComparator = mapComparator;
}
Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
@@ -603,7 +603,7 @@
private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
ReduceSchedulingMgr(CapacityTaskScheduler dad) {
super(dad);
- type = TaskSchedulingMgr.TYPE.REDUCE;
+ type = CapacityTaskScheduler.TYPE.REDUCE;
queueComparator = reduceComparator;
}
Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
@@ -664,13 +664,18 @@
return System.currentTimeMillis();
}
}
+ // can be replaced with a global type, if we have one
+ protected static enum TYPE {
+ MAP, REDUCE
+ }
+
private Clock clock;
private JobInitializationPoller initializationPoller;
- long limitMaxVmemForTasks;
- long limitMaxPmemForTasks;
- long defaultMaxVmPerTask;
- float defaultPercentOfPmemInVmem;
+ private long memSizeForMapSlotOnJT;
+ private long memSizeForReduceSlotOnJT;
+ private long limitMaxMemForMapTasks;
+ private long limitMaxMemForReduceTasks;
public CapacityTaskScheduler() {
this(new Clock());
@@ -687,37 +692,45 @@
this.schedConf = conf;
}
- /**
- * Normalize the negative values in configuration
- *
- * @param val
- * @return normalized value
- */
- private long normalizeMemoryConfigValue(long val) {
- if (val < 0) {
- val = JobConf.DISABLED_MEMORY_LIMIT;
- }
- return val;
- }
-
private void initializeMemoryRelatedConf() {
- limitMaxVmemForTasks =
- normalizeMemoryConfigValue(conf.getLong(
- JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+ memSizeForMapSlotOnJT =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT));
+ memSizeForReduceSlotOnJT =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
JobConf.DISABLED_MEMORY_LIMIT));
+ limitMaxMemForMapTasks =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT));
+ limitMaxMemForReduceTasks =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT));
+ LOG.info(new StringBuilder().append("Scheduler configured with ").append(
+ "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append(
+ " limitMaxMemForMapTasks, limitMaxMemForReduceTasks)").append(
+ memSizeForMapSlotOnJT).append(", ").append(memSizeForReduceSlotOnJT)
+ .append(", ").append(limitMaxMemForMapTasks).append(", ").append(
+ limitMaxMemForReduceTasks).append(")"));
+ }
- limitMaxPmemForTasks =
- normalizeMemoryConfigValue(schedConf.getLimitMaxPmemForTasks());
+ long getMemSizeForMapSlot() {
+ return memSizeForMapSlotOnJT;
+ }
- defaultMaxVmPerTask =
- normalizeMemoryConfigValue(conf.getLong(
- JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT));
+ long getMemSizeForReduceSlot() {
+ return memSizeForReduceSlotOnJT;
+ }
- defaultPercentOfPmemInVmem = schedConf.getDefaultPercentOfPmemInVmem();
- if (defaultPercentOfPmemInVmem < 0) {
- defaultPercentOfPmemInVmem = JobConf.DISABLED_MEMORY_LIMIT;
- }
+ long getLimitMaxMemForMapSlot() {
+ return limitMaxMemForMapTasks;
+ }
+
+ long getLimitMaxMemForReduceSlot() {
+ return limitMaxMemForReduceTasks;
}
@Override
@@ -955,14 +968,12 @@
// found a task; return
return Collections.singletonList(tlr.getTask());
}
- else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT ==
- tlr.getLookUpStatus()) {
- // return no task
- return null;
- }
// if we didn't get any, look at map tasks, if TT has space
- else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND ==
- tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) {
+ else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
+ == tlr.getLookUpStatus() ||
+ TaskLookupResult.LookUpStatus.NO_TASK_FOUND
+ == tlr.getLookUpStatus())
+ && (maxMapTasks > currentMapTasks)) {
mapScheduler.updateCollectionOfQSIs();
tlr = mapScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
@@ -980,13 +991,12 @@
// found a task; return
return Collections.singletonList(tlr.getTask());
}
- else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT ==
- tlr.getLookUpStatus()) {
- return null;
- }
// if we didn't get any, look at reduce tasks, if TT has space
- else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND ==
- tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) {
+ else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
+ == tlr.getLookUpStatus()
+ || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
+ == tlr.getLookUpStatus())
+ && (maxReduceTasks > currentReduceTasks)) {
reduceScheduler.updateCollectionOfQSIs();
tlr = reduceScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
@@ -999,38 +1009,6 @@
return null;
}
- /**
- * Kill the job if it has invalid requirements and return why it is killed
- *
- * @param job
- * @return string mentioning why the job is killed. Null if the job has valid
- * requirements.
- */
- private String killJobIfInvalidRequirements(JobInProgress job) {
- if (!memoryMatcher.isSchedulingBasedOnVmemEnabled()) {
- return null;
- }
- if ((job.getMaxVirtualMemoryForTask() > limitMaxVmemForTasks)
- || (memoryMatcher.isSchedulingBasedOnPmemEnabled() && (job
- .getMaxPhysicalMemoryForTask() > limitMaxPmemForTasks))) {
- String msg =
- job.getJobID() + " (" + job.getMaxVirtualMemoryForTask() + "vmem, "
- + job.getMaxPhysicalMemoryForTask()
- + "pmem) exceeds the cluster's max-memory-limits ("
- + limitMaxVmemForTasks + "vmem, " + limitMaxPmemForTasks
- + "pmem). Cannot run in this cluster, so killing it.";
- LOG.warn(msg);
- try {
- taskTrackerManager.killJob(job.getJobID());
- return msg;
- } catch (IOException ioe) {
- LOG.warn("Failed to kill the job " + job.getJobID() + ". Reason : "
- + StringUtils.stringifyException(ioe));
- }
- }
- return null;
- }
-
// called when a job is added
synchronized void jobAdded(JobInProgress job) throws IOException {
QueueSchedulingInfo qsi =
@@ -1050,13 +1028,6 @@
qsi.numJobsByUser.put(job.getProfile().getUser(), i);
LOG.debug("Job " + job.getJobID().toString() + " is added under user "
+ job.getProfile().getUser() + ", user now has " + i + " jobs");
-
- // Kill the job if it cannot run in the cluster because of invalid
- // resource requirements.
- String statusMsg = killJobIfInvalidRequirements(job);
- if (statusMsg != null) {
- throw new IOException(statusMsg);
- }
}
// called when a job completes
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java Tue May 26 13:30:37 2009
@@ -30,111 +30,33 @@
this.scheduler = capacityTaskScheduler;
}
- boolean isSchedulingBasedOnVmemEnabled() {
- LOG.debug("defaultMaxVmPerTask : " + scheduler.defaultMaxVmPerTask
- + " limitMaxVmemForTasks : " + scheduler.limitMaxVmemForTasks);
- if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT
- || scheduler.limitMaxVmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
+ boolean isSchedulingBasedOnMemEnabled() {
+ if (scheduler.getLimitMaxMemForMapSlot()
+ == JobConf.DISABLED_MEMORY_LIMIT
+ || scheduler.getLimitMaxMemForReduceSlot()
+ == JobConf.DISABLED_MEMORY_LIMIT
+ || scheduler.getMemSizeForMapSlot()
+ == JobConf.DISABLED_MEMORY_LIMIT
+ || scheduler.getMemSizeForReduceSlot()
+ == JobConf.DISABLED_MEMORY_LIMIT) {
return false;
}
return true;
}
- boolean isSchedulingBasedOnPmemEnabled() {
- LOG.debug("defaultPercentOfPmemInVmem : "
- + scheduler.defaultPercentOfPmemInVmem + " limitMaxPmemForTasks : "
- + scheduler.limitMaxPmemForTasks);
- if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT
- || scheduler.limitMaxPmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
- return false;
- }
- return true;
- }
-
- /**
- * Obtain the virtual memory allocated for a job's tasks.
- *
- * If the job has a configured value for the max-virtual memory, that will be
- * returned. Else, the cluster-wide default max-virtual memory for tasks is
- * returned.
- *
- * This method can only be called after
- * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
- *
- * @param jConf JobConf of the job
- * @return the virtual memory allocated for the job's tasks.
- */
- private long getVirtualMemoryForTask(JobConf jConf) {
- long vMemForTask = jConf.getMaxVirtualMemoryForTask();
- if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
- vMemForTask =
- new JobConf().getLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- scheduler.defaultMaxVmPerTask);
- }
- return vMemForTask;
- }
-
- /**
- * Obtain the physical memory allocated for a job's tasks.
- *
- * If the job has a configured value for the max physical memory, that
- * will be returned. Else, the cluster-wide default physical memory for
- * tasks is returned.
- *
- * This method can only be called after
- * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
- *
- * @param jConf JobConf of the job
- * @return the physical memory allocated for the job's tasks
- */
- private long getPhysicalMemoryForTask(JobConf jConf) {
- long pMemForTask = jConf.getMaxPhysicalMemoryForTask();
- if (pMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
- pMemForTask =
- Math.round(getVirtualMemoryForTask(jConf)
- * scheduler.defaultPercentOfPmemInVmem);
- }
- return pMemForTask;
- }
-
- static class Memory {
- long vmem;
- long pmem;
-
- Memory(long vm, long pm) {
- this.vmem = vm;
- this.pmem = pm;
- }
- }
-
/**
* Find the memory that is already used by all the running tasks
* residing on the given TaskTracker.
*
* @param taskTracker
+ * @param taskType
* @return amount of memory that is used by the residing tasks,
* null if memory cannot be computed for some reason.
*/
- private synchronized Memory getMemReservedForTasks(
- TaskTrackerStatus taskTracker) {
- boolean disabledVmem = false;
- boolean disabledPmem = false;
-
- if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
- disabledVmem = true;
- }
-
- if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT) {
- disabledPmem = true;
- }
-
- if (disabledVmem && disabledPmem) {
- return new Memory(JobConf.DISABLED_MEMORY_LIMIT,
- JobConf.DISABLED_MEMORY_LIMIT);
- }
-
+ private synchronized Long getMemReservedForTasks(
+ TaskTrackerStatus taskTracker, CapacityTaskScheduler.TYPE taskType) {
long vmem = 0;
- long pmem = 0;
+ long myVmem = 0;
for (TaskStatus task : taskTracker.getTaskReports()) {
// the following task states are one in which the slot is
@@ -142,12 +64,12 @@
// accounted in used memory.
if ((task.getRunState() == TaskStatus.State.RUNNING)
|| (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
- JobInProgress job = scheduler.taskTrackerManager.getJob(
- task.getTaskID().getJobID());
+ JobInProgress job =
+ scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID());
if (job == null) {
// This scenario can happen if a job was completed/killed
- // and retired from JT's memory. In this state, we can ignore
- // the running task status and compute memory for the rest of
+ // and retired from JT's memory. In this state, we can ignore
+ // the running task status and compute memory for the rest of
// the tasks. However, any scheduling done with this computation
// could result in over-subscribing of memory for tasks on this
// TT (as the unaccounted for task is still running).
@@ -155,123 +77,98 @@
// One of the ways of doing that is to return null from here
// and check for null in the calling method.
LOG.info("Task tracker: " + taskTracker.getHost() + " is reporting "
- + "a running / commit pending task: " + task.getTaskID()
- + " but no corresponding job was found. "
- + "Maybe job was retired. Not computing "
- + "memory values for this TT.");
+ + "a running / commit pending task: " + task.getTaskID()
+ + " but no corresponding job was found. "
+ + "Maybe job was retired. Not computing "
+ + "memory values for this TT.");
return null;
}
-
- JobConf jConf =
- scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID())
- .getJobConf();
- if (!disabledVmem) {
- vmem += getVirtualMemoryForTask(jConf);
- }
- if (!disabledPmem) {
- pmem += getPhysicalMemoryForTask(jConf);
+
+ JobConf jConf = job.getJobConf();
+
+ // Get the memory "allotted" for this task by rounding off the job's
+ // tasks' memory limits to the nearest multiple of the slot-memory-size
+ // set on JT. This essentially translates to tasks of a high memory job
+ // using multiple slots.
+ if (task.getIsMap() && taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
+ myVmem = jConf.getMemoryForMapTask();
+ myVmem =
+ (long) (scheduler.getMemSizeForMapSlot() * Math
+ .ceil((float) myVmem
+ / (float) scheduler.getMemSizeForMapSlot()));
+ } else if (!task.getIsMap()
+ && taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+ myVmem = jConf.getMemoryForReduceTask();
+ myVmem =
+ (long) (scheduler.getMemSizeForReduceSlot() * Math
+ .ceil((float) myVmem
+ / (float) scheduler.getMemSizeForReduceSlot()));
}
+ vmem += myVmem;
}
}
- return new Memory(vmem, pmem);
+ return Long.valueOf(vmem);
}
/**
- * Check if a TT has enough pmem and vmem to run this job.
+ * Check if a TT has enough memory to run of task specified from this job.
* @param job
+ * @param taskType
* @param taskTracker
* @return true if this TT has enough memory for this job. False otherwise.
*/
boolean matchesMemoryRequirements(JobInProgress job,
- TaskTrackerStatus taskTracker) {
+ CapacityTaskScheduler.TYPE taskType, TaskTrackerStatus taskTracker) {
- // ////////////// vmem based scheduling
- if (!isSchedulingBasedOnVmemEnabled()) {
- LOG.debug("One of the configuration parameters defaultMaxVmPerTask "
- + "and limitMaxVmemPerTasks is not configured. Scheduling based "
- + "on job's memory requirements is disabled, ignoring any value "
- + "set by job.");
- return true;
- }
-
- TaskTrackerStatus.ResourceStatus resourceStatus =
- taskTracker.getResourceStatus();
- long totalVMemOnTT = resourceStatus.getTotalVirtualMemory();
- long reservedVMemOnTT = resourceStatus.getReservedTotalMemory();
-
- if (totalVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT
- || reservedVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
- return true;
- }
+ LOG.debug("Matching memory requirements of " + job.getJobID().toString()
+ + " for scheduling on " + taskTracker.trackerName);
- if (reservedVMemOnTT > totalVMemOnTT) {
+ if (!isSchedulingBasedOnMemEnabled()) {
+ LOG.debug("Scheduling based on job's memory requirements is disabled."
+ + " Ignoring any value set by job.");
return true;
}
- long jobVMemForTask = job.getMaxVirtualMemoryForTask();
- if (jobVMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
- jobVMemForTask = scheduler.defaultMaxVmPerTask;
- }
-
- Memory memReservedForTasks = getMemReservedForTasks(taskTracker);
- if (memReservedForTasks == null) {
+ Long memUsedOnTT = getMemReservedForTasks(taskTracker, taskType);
+ if (memUsedOnTT == null) {
// For some reason, maybe because we could not find the job
// corresponding to a running task (as can happen if the job
// is retired in between), we could not compute the memory state
// on this TT. Treat this as an error, and fail memory
// requirements.
- LOG.info("Could not compute memory for taskTracker: "
- + taskTracker.getHost() + ". Failing memory requirements.");
+ LOG.info("Could not compute memory for taskTracker: "
+ + taskTracker.getHost() + ". Failing memory requirements.");
return false;
}
- long vmemUsedOnTT = memReservedForTasks.vmem;
- long pmemUsedOnTT = memReservedForTasks.pmem;
- long freeVmemUsedOnTT = totalVMemOnTT - vmemUsedOnTT - reservedVMemOnTT;
+ long totalMemUsableOnTT = 0;
- if (jobVMemForTask > freeVmemUsedOnTT) {
+ long memForThisTask = 0;
+ if (taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
+ memForThisTask = job.getJobConf().getMemoryForMapTask();
+ totalMemUsableOnTT =
+ scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapTasks();
+ } else if (taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+ memForThisTask = job.getJobConf().getMemoryForReduceTask();
+ totalMemUsableOnTT =
+ scheduler.getMemSizeForReduceSlot()
+ * taskTracker.getMaxReduceTasks();
+ }
+
+ long freeMemOnTT = totalMemUsableOnTT - memUsedOnTT.longValue();
+ if (memForThisTask > freeMemOnTT) {
+ LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT ("
+ + freeMemOnTT + "). A " + taskType + " task from "
+ + job.getJobID().toString() + " cannot be scheduled on TT "
+ + taskTracker.trackerName);
return false;
}
- // ////////////// pmem based scheduling
-
- long totalPmemOnTT = resourceStatus.getTotalPhysicalMemory();
- long reservedPmemOnTT = resourceStatus.getReservedPhysicalMemory();
- long jobPMemForTask = job.getMaxPhysicalMemoryForTask();
- long freePmemUsedOnTT = 0;
-
- if (isSchedulingBasedOnPmemEnabled()) {
- if (totalPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT
- || reservedPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
- return true;
- }
-
- if (reservedPmemOnTT > totalPmemOnTT) {
- return true;
- }
-
- if (jobPMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
- jobPMemForTask =
- Math.round(jobVMemForTask * scheduler.defaultPercentOfPmemInVmem);
- }
-
- freePmemUsedOnTT = totalPmemOnTT - pmemUsedOnTT - reservedPmemOnTT;
-
- if (jobPMemForTask > freePmemUsedOnTT) {
- return false;
- }
- } else {
- LOG.debug("One of the configuration parameters "
- + "defaultPercentOfPmemInVmem and limitMaxPmemPerTasks is not "
- + "configured. Scheduling based on job's physical memory "
- + "requirements is disabled, ignoring any value set by job.");
- }
-
- LOG.debug("freeVMemOnTT = " + freeVmemUsedOnTT + " totalVMemOnTT = "
- + totalVMemOnTT + " freePMemOnTT = " + freePmemUsedOnTT
- + " totalPMemOnTT = " + totalPmemOnTT + " jobVMemForTask = "
- + jobVMemForTask + " jobPMemForTask = " + jobPMemForTask);
+ LOG.debug("memForThisTask = " + memForThisTask + ". freeMemOnTT = "
+ + freeMemOnTT + ". A " + taskType.toString() + " task from "
+ + job.getJobID().toString() + " matches memory requirements on TT "
+ + taskTracker.trackerName);
return true;
}
}
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue May 26 13:30:37 2009
@@ -182,8 +182,6 @@
}
mapTaskCtr = 0;
redTaskCtr = 0;
- super.setMaxVirtualMemoryForTask(jobConf.getMaxVirtualMemoryForTask());
- super.setMaxPhysicalMemoryForTask(jobConf.getMaxPhysicalMemoryForTask());
}
@Override
@@ -232,7 +230,7 @@
}
return task;
}
-
+
@Override
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
int clusterSize, int ignored) throws IOException {
@@ -733,7 +731,7 @@
private FakeJobInProgress submitJob(int state, JobConf jobConf) throws IOException {
FakeJobInProgress job =
new FakeJobInProgress(new JobID("test", ++jobCounter),
- (jobConf == null ? new JobConf() : jobConf), taskTrackerManager,
+ (jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager,
jobConf.getUser());
job.getStatus().setRunState(state);
taskTrackerManager.submitJob(job);
@@ -1504,12 +1502,6 @@
LOG.debug("Starting the scheduler.");
taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
- // Limited TT - 1GB vmem and 512MB pmem
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
- .setTotalVirtualMemory(1 * 1024 * 1024 * 1024L);
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
- .setTotalPhysicalMemory(512 * 1024 * 1024L);
-
taskTrackerManager.addQueues(new String[] { "default" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -1519,11 +1511,11 @@
// memory-based scheduling disabled by default.
scheduler.start();
- LOG.debug("Submit one high memory(3GB vmem, 1GBpmem) job of 1 map task "
- + "and 1 reduce task.");
+ LOG.debug("Submit one high memory job of 1 3GB map task "
+ + "and 1 1GB reduce task.");
JobConf jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
- jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
+ jConf.setMemoryForMapTask(3 * 1024L); // 3GB
+ jConf.setMemoryForReduceTask(1 * 1024L); // 1 GB
jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(1);
jConf.setQueueName("default");
@@ -1538,197 +1530,57 @@
}
/**
- * Test to verify that highPmemJobs are scheduled like all other jobs when
- * physical-memory based scheduling is not enabled.
- * @throws IOException
- */
- public void testDisabledPmemBasedScheduling()
- throws IOException {
-
- LOG.debug("Starting the scheduler.");
- taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-
- // Limited TT - 100GB vmem and 500MB pmem
- TaskTrackerStatus.ResourceStatus ttStatus =
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
- ttStatus.setTotalVirtualMemory(100 * 1024 * 1024 * 1024L);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(500 * 1024 * 1024L);
- ttStatus.setReservedPhysicalMemory(0);
-
- taskTrackerManager.addQueues(new String[] { "default" });
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
- scheduler.setTaskTrackerManager(taskTrackerManager);
- // enable vmem-based scheduling. pmem based scheduling disabled by default.
- scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- 1536 * 1024 * 1024L);
- scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- 3 * 1024 * 1024 * 1024L);
- scheduler.start();
-
- LOG.debug("Submit one high pmem(3GB vmem, 1GBpmem) job of 1 map task "
- + "and 1 reduce task.");
- JobConf jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
- jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(1);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- submitJobAndInit(JobStatus.RUNNING, jConf);
-
- // assert that all tasks are launched even though they transgress the
- // scheduling limits.
-
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
- }
-
- /**
- * Test HighMemoryJobs.
- * @throws IOException
- */
- public void testHighMemoryJobs()
- throws IOException {
-
- LOG.debug("Starting the scheduler.");
- taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-
- TaskTrackerStatus.ResourceStatus ttStatus =
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
- ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
- ttStatus.setReservedPhysicalMemory(0);
- // Normal job on this TT would be 1.5GB vmem, 0.5GB pmem
-
- taskTrackerManager.addQueues(new String[] { "default" });
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setTaskTrackerManager(taskTrackerManager);
- // enabled memory-based scheduling
- scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- 1536 * 1024 * 1024L);
- scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- 3 * 1024 * 1024 * 1024L);
- resConf.setDefaultPercentOfPmemInVmem(33.3f);
- resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
-
- LOG.debug("Submit one high memory(1600MB vmem, 400MB pmem) job of "
- + "1 map task and 1 reduce task.");
- JobConf jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(1600 * 1024 * 1024L); // 1.6GB vmem
- jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(1);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- jConf.setMapSpeculativeExecution(false);
- jConf.setReduceSpeculativeExecution(false);
- FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-
- // No more tasks of this job can run on the TT because of lack of vmem
- assertNull(scheduler.assignTasks(tracker("tt1")));
-
- // Let attempt_test_0001_m_000001_0 finish, task assignment should succeed.
- taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
- checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-
- LOG.debug("Submit another high memory(1200MB vmem, 800MB pmem) job of "
- + "1 map task and 0 reduces.");
- jConf.setMaxVirtualMemoryForTask(1200 * 1024 * 1024L);
- jConf.setMaxPhysicalMemoryForTask(800 * 1024 * 1024L);
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(0);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- jConf.setMapSpeculativeExecution(false);
- jConf.setReduceSpeculativeExecution(false);
- submitJobAndInit(JobStatus.PREP, jConf); // job2
-
- // This job shouldn't run the TT now because of lack of pmem
- assertNull(scheduler.assignTasks(tracker("tt1")));
-
- // Let attempt_test_0001_m_000002_0 finish, task assignment should succeed.
- taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job1);
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-
- LOG.debug("Submit a normal memory(200MB vmem, 100MB pmem) job of "
- + "0 maps and 1 reduce task.");
- jConf.setMaxVirtualMemoryForTask(200 * 1024 * 1024L);
- jConf.setMaxPhysicalMemoryForTask(100 * 1024 * 1024L);
- jConf.setNumMapTasks(0);
- jConf.setNumReduceTasks(1);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- submitJobAndInit(JobStatus.PREP, jConf); // job3
-
- checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
- }
-
- /**
- * Test HADOOP-4979.
- * Bug fix for making sure we always return null to TT if there is a
- * high-mem job, and not look at reduce jobs (if map tasks are high-mem)
- * or vice-versa.
+ * Test reverting HADOOP-4979. If there is a high-mem job, we should now look
+ * at reduce jobs (if map tasks are high-mem) or vice-versa.
+ *
* @throws IOException
*/
- public void testHighMemoryBlocking()
+ public void testHighMemoryBlockingAcrossTaskTypes()
throws IOException {
// 2 map and 1 reduce slots
taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
- TaskTrackerStatus.ResourceStatus ttStatus =
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
- ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
- ttStatus.setReservedPhysicalMemory(0);
- // Normal job on this TT would be 1GB vmem, 0.5GB pmem
-
taskTrackerManager.addQueues(new String[] { "default" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
- scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- 1 * 1024 * 1024 * 1024L);
- scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- 3 * 1024 * 1024 * 1024L);
- resConf.setDefaultPercentOfPmemInVmem(33.3f);
- resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
+ // Normal job in the cluster would be 1GB maps/reduces
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ 2 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
- // We need a situation where the scheduler needs to run a map task,
- // but the available one has a high-mem requirement. There should
- // be another job whose maps or reduces can run, but they shouldn't
- // be scheduled.
+ // The situation : Two jobs in the queue. First job with only maps and no
+ // reduces and is a high memory job. Second job is a normal job with both maps and reduces.
+ // First job cannot run for want of memory for maps. In this case, second job's reduces should run.
- LOG.debug("Submit one high memory(2GB vmem, 400MB pmem) job of "
+ LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
+ "2 map tasks");
- JobConf jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
- jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+ JobConf jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(0);
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(0);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
- LOG.debug("Submit another regular memory(900MB vmem, 200MB pmem) job of "
+
+ LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
+ "2 map/red tasks");
- jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(900 * 1024 * 1024L); // 900MB vmem
- jConf.setMaxPhysicalMemoryForTask(200 * 1024 * 1024L); // 200MB pmem
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(2);
jConf.setQueueName("default");
@@ -1738,75 +1590,8 @@
// first, a map from j1 will run
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// at this point, the scheduler tries to schedule another map from j1.
- // there isn't enough space. There is space to run the second job's
- // map or reduce task, but they shouldn't be scheduled
- assertNull(scheduler.assignTasks(tracker("tt1")));
- }
-
- /**
- * test invalid highMemoryJobs
- * @throws IOException
- */
- public void testHighMemoryJobWithInvalidRequirements()
- throws IOException {
- LOG.debug("Starting the scheduler.");
- taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
- TaskTrackerStatus.ResourceStatus ttStatus =
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
- ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024);
- ttStatus.setReservedPhysicalMemory(0);
-
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
- taskTrackerManager.addQueues(new String[] { "default" });
- resConf.setFakeQueues(queues);
- scheduler.setTaskTrackerManager(taskTrackerManager);
- // enabled memory-based scheduling
- long vmemUpperLimit = 1 * 1024 * 1024 * 1024L;
- long vmemDefault = 1536 * 1024 * 1024L;
- long pmemUpperLimit = vmemUpperLimit;
- scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- vmemDefault);
- scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- vmemUpperLimit);
- resConf.setDefaultPercentOfPmemInVmem(33.3f);
- resConf.setLimitMaxPmemForTasks(pmemUpperLimit);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
-
- LOG.debug("Submit one invalid high ram(5GB vmem, 3GB pmem) job of "
- + "1 map, 0 reduce tasks.");
- long jobMaxVmem = 5 * 1024 * 1024 * 1024L;
- long jobMaxPmem = 3 * 1024 * 1024 * 1024L;
- JobConf jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(jobMaxVmem);
- jConf.setMaxPhysicalMemoryForTask(jobMaxPmem);
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(0);
- jConf.setQueueName("default");
- jConf.setUser("u1");
-
- boolean throwsException = false;
- String msg = null;
- FakeJobInProgress job;
- try {
- job = submitJob(JobStatus.PREP, jConf);
- } catch (IOException ioe) {
- // job has to fail
- throwsException = true;
- msg = ioe.getMessage();
- }
-
- assertTrue(throwsException);
- job = (FakeJobInProgress) taskTrackerManager.getJobs().toArray()[0];
- assertTrue(msg.matches(job.getJobID() + " \\(" + jobMaxVmem + "vmem, "
- + jobMaxPmem + "pmem\\) exceeds the cluster's max-memory-limits \\("
- + vmemUpperLimit + "vmem, " + pmemUpperLimit
- + "pmem\\). Cannot run in this cluster, so killing it."));
- // For job, no cleanup task needed so gets killed immediately.
- assertTrue(job.getStatus().getRunState() == JobStatus.KILLED);
+ // there isn't enough space. The second job's reduce should be scheduled.
+ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
}
/**
@@ -1817,13 +1602,7 @@
throws IOException {
LOG.debug("Starting the scheduler.");
- taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
- TaskTrackerStatus.ResourceStatus ttStatus =
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
- ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
- ttStatus.setReservedPhysicalMemory(0);
+ taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -1831,68 +1610,65 @@
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
- scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- 1536 * 1024 * 1024L);
- scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- 4 * 1024 * 1024 * 1024L);
- resConf.setDefaultPercentOfPmemInVmem(33.3f);
- resConf.setLimitMaxPmemForTasks(2 * 1024 * 1024 * 1024L);
+ // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ 2 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ 2 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
- LOG.debug("Submit one high memory(4GB vmem, 512MB pmem) job of "
+ LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
+ "1 map, 0 reduce tasks.");
- JobConf jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(4 * 1024 * 1024 * 1024L);
- jConf.setMaxPhysicalMemoryForTask(512 * 1024 * 1024L);
+ JobConf jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(0);
+ jConf.setNumReduceTasks(1);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
- // TTs should not run these jobs i.e. cluster blocked because of lack of
- // vmem
- assertNull(scheduler.assignTasks(tracker("tt1")));
- assertNull(scheduler.assignTasks(tracker("tt1")));
- // Job should still be alive
- assertTrue(job1.getStatus().getRunState() == JobStatus.RUNNING);
-
- LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
- // Use cluster-wide defaults
- jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
- jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
+ // Fill the second tt with this job.
+ checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+
+ LOG.debug("Submit one high memory(2GB maps/reduces) job of "
+ + "2 map, 2 reduce tasks.");
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(2 * 1024);
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(2);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
- // cluster should still be blocked for job1 and so even job2 should not run
- // even though it is a normal job
- assertNull(scheduler.assignTasks(tracker("tt1")));
-
- scheduler.taskTrackerManager.killJob(job2.getJobID());
- scheduler.taskTrackerManager.killJob(job1.getJobID());
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
- LOG.debug("Submit one high memory(2GB vmem, 2GB pmem) job of "
+ LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
+ "1 map, 0 reduce tasks.");
- jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L);
- jConf.setMaxPhysicalMemoryForTask(2 * 1024 * 1024 * 1024L);
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(1);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
- // TTs should not run these jobs i.e. cluster blocked because of lack of
- // pmem now.
- assertNull(scheduler.assignTasks(tracker("tt1")));
- assertNull(scheduler.assignTasks(tracker("tt1")));
-
- // Job should still be alive
- assertTrue(job3.getStatus().getRunState() == JobStatus.RUNNING);
- LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
- // Use cluster-wide defaults
- jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
- jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
- submitJobAndInit(JobStatus.PREP, jConf); // job4
-
- // cluster should still be blocked for job3 and so even job4 should not run
- // even though it is a normal job
+ // Job2 cannot fit on tt2 or tt1. Blocking. Job3 also will not run.
assertNull(scheduler.assignTasks(tracker("tt1")));
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ assertNull(scheduler.assignTasks(tracker("tt2")));
}
/**
@@ -1906,13 +1682,6 @@
// create a cluster with a single node.
LOG.debug("Starting cluster with 1 tasktracker, 2 map and 2 reduce slots");
taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
- TaskTrackerStatus.ResourceStatus ttStatus =
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
- LOG.debug("Assume TT has 4 GB virtual mem and 2 GB RAM");
- ttStatus.setTotalVirtualMemory(4 * 1024 * 1024 * 1024L);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(2 * 1024 * 1024 * 1024L);
- ttStatus.setReservedPhysicalMemory(0);
// create scheduler
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -1921,14 +1690,17 @@
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
- LOG.debug("By default, jobs get 0.5 GB per task vmem" +
- " and 2 GB max vmem, with 50% of it for RAM");
- scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- 512 * 1024 * 1024L);
- scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- 2 * 1024 * 1024 * 1024L);
- resConf.setDefaultPercentOfPmemInVmem(50.0f);
- resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
+ LOG.debug("Assume TT has 2GB for maps and 2GB for reduces");
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ 2 * 1024L);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 512);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ 2 * 1024L);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 512);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1937,6 +1709,8 @@
JobConf jConf = new JobConf();
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(2);
+ jConf.setMemoryForMapTask(512);
+ jConf.setMemoryForReduceTask(512);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
@@ -1955,6 +1729,8 @@
jConf = new JobConf();
jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(1);
+ jConf.setMemoryForMapTask(512);
+ jConf.setMemoryForReduceTask(512);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
@@ -2338,21 +2114,8 @@
*/
public void testHighRamJobWithSpeculativeExecution() throws IOException {
// 2 map and 2 reduce slots
- taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
-
- //task tracker memory configurations.
- TaskTrackerStatus.ResourceStatus ttStatus =
- taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
- ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
- ttStatus.setReservedPhysicalMemory(0);
- ttStatus = taskTrackerManager.getTaskTracker("tt2").getResourceStatus();
- ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
- ttStatus.setReservedVirtualMemory(0);
- ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
- ttStatus.setReservedPhysicalMemory(0);
-
+ taskTrackerManager = new FakeTaskTrackerManager(2, 3, 3);
+ // 1GB for each map, 1GB for each reduce
taskTrackerManager.addQueues(new String[] { "default" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -2360,19 +2123,23 @@
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
- scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- 1 * 1024 * 1024 * 1024L);
- scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- 3 * 1024 * 1024 * 1024L);
- resConf.setDefaultPercentOfPmemInVmem(33.3f);
- resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ 3 * 1024L);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ 3 * 1024L);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024L);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
-
+ //Submit a high memory job with speculative tasks.
JobConf jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
- jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(0);
jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(0);
jConf.setQueueName("default");
@@ -2381,20 +2148,18 @@
jConf.setReduceSpeculativeExecution(false);
FakeJobInProgress job1 = new FakeJobInProgress(new JobID("test", ++jobCounter),
jConf, taskTrackerManager,"u1");
-
- //Submit a high memory job with speculative tasks.
taskTrackerManager.submitJob(job1);
-
+
+ //Submit normal job
jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem
- jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(0);
jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(0);
jConf.setQueueName("default");
jConf.setUser("u1");
jConf.setMapSpeculativeExecution(false);
jConf.setReduceSpeculativeExecution(false);
- //Submit normal job
FakeJobInProgress job2 = submitJob(JobStatus.PREP, jConf);
controlledInitializationPoller.selectJobsToInitialize();
@@ -2429,8 +2194,8 @@
//Now submit high ram job with speculative reduce and check.
jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
- jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(2 * 1024L);
jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(1);
jConf.setQueueName("default");
@@ -2439,76 +2204,43 @@
jConf.setReduceSpeculativeExecution(true);
FakeJobInProgress job3 = new FakeJobInProgress(new JobID("test", ++jobCounter),
jConf, taskTrackerManager,"u1");
-
- //Submit a high memory job with speculative reduce tasks.
taskTrackerManager.submitJob(job3);
-
+
+ //Submit normal job w.r.t reduces
jConf = new JobConf();
- jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem
- jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem
+ jConf.setMemoryForMapTask(2 * 1024L);
+ jConf.setMemoryForReduceTask(1 * 104L);
jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(1);
jConf.setQueueName("default");
jConf.setUser("u1");
jConf.setMapSpeculativeExecution(false);
jConf.setReduceSpeculativeExecution(false);
- //Submit normal job
FakeJobInProgress job4 = submitJob(JobStatus.PREP, jConf);
controlledInitializationPoller.selectJobsToInitialize();
raiseStatusChangeEvents(scheduler.jobQueuesManager);
- //all maps of jobs get assigned to same task tracker as
- //job does not have speculative map and same tracker sends two heart
- //beat back to back.
+
+ // Finish up the map scheduler
checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
- //first map slot gets attention on this tracker.
checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
- //now first reduce of the job3 would be scheduled on tt2 since it has
- //memory.
- //assigntasks() would check for free reduce slot is greater than
- //map slots. Seeing there is more free reduce slot it would try scheduling
- //reduce of job1 but would block as in it is a high memory task.
- assertNull(scheduler.assignTasks(tracker("tt1")));
- //TT2 would get the reduce task from high memory job as the tt is running
- //normal jobs map. which is low mem.
- checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
- // now if either TT comes back, it will block because all maps
- // are done, and the first jobs reduce has a speculative task.
+
+ // first, a reduce from j3 will run
+ // at this point, there is a speculative task for the same job to be
+ //scheduled. This task would be scheduled. Till the tasks from job3 gets
+ //complete none of the tasks from other jobs would be scheduled.
+ checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
+ assertEquals("pending reduces greater than zero " , job3.pendingMaps(), 0);
+ //make same tracker get back, check if you are blocking. Your job
+ //has speculative reduce task so tracker should be blocked even tho' it
+ //can run job4's reduce.
assertNull(scheduler.assignTasks(tracker("tt1")));
- assertNull(scheduler.assignTasks(tracker("tt2")));
- //finish maps.
- taskTrackerManager.finishTask("tt1", "attempt_test_0003_m_000001_0",
- job3);
- taskTrackerManager.finishTask("tt2", "attempt_test_0004_m_000001_0",
- job4);
- //check speculative reduce code path is covered.
- assertEquals("Pending reduces not zero for high " +
- "ram job with speculative reduce.", 0, job3.pendingReduces());
- //if tt2 returns back it is not given any task even if it can schedule
- //job2 reduce.
- assertNull(scheduler.assignTasks(tracker("tt2")));
- //speculative reduce of the job3 would be scheduled.
- checkAssignment("tt1", "attempt_test_0003_r_000001_1 on tt1");
- //now both speculative and actual task have been scheduled for job3.
- //Normal task of Job4 would now be scheduled on TT1 as it has free space
- //to run.
+ //TT2 now gets speculative map of the job1
+ checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
+
+ // Now since j3 has no more speculative reduces, it can schedule
+ // the j4.
checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
- //No more tasks.
- assertNull(scheduler.assignTasks(tracker("tt2")));
- assertNull(scheduler.assignTasks(tracker("tt1")));
-
- //finish all the reduces.
- taskTrackerManager.finishTask("tt1", "attempt_test_0003_r_000001_1",
- job3);
- taskTrackerManager.finishTask("tt2", "attempt_test_0003_r_000001_0",
- job3);
- //finish the job
- taskTrackerManager.finalizeJob(job3);
- //finish the task and the job.
- taskTrackerManager.finishTask("tt1", "attempt_test_0004_r_000001_0",
- job4);
- taskTrackerManager.finalizeJob(job4);
-
}
private void checkRunningJobMovementAndCompletion() throws IOException {
Modified: hadoop/core/trunk/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/mapred-default.xml?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/mapred-default.xml (original)
+++ hadoop/core/trunk/src/mapred/mapred-default.xml Tue May 26 13:30:37 2009
@@ -186,42 +186,6 @@
</property>
<property>
- <name>mapred.tasktracker.pmem.reserved</name>
- <value>-1</value>
- <description>Configuration property to specify the amount of physical memory
- that has to be reserved by the TaskTracker for system usage (OS, TT etc).
- The reserved physical memory should be a part of the total physical memory
- available on the TaskTracker.
-
- The reserved physical memory and the total physical memory values are
- reported by the TaskTracker as part of heart-beat so that they can
- considered by a scheduler. Please refer to the documentation of the
- configured scheduler to see how this property is used.
- </description>
-</property>
-
-<property>
- <name>mapred.task.default.maxvmem</name>
- <value>-1</value>
- <description>
- Cluster-wide configuration in bytes to be set by the administrators that
- provides default amount of maximum virtual memory for job's tasks. This has
- to be set on both the JobTracker node for the sake of scheduling decisions
- and on the TaskTracker nodes for the sake of memory management.
-
- If a job doesn't specify its virtual memory requirement by setting
- mapred.task.maxvmem to -1, tasks are assured a memory limit set
- to this property. This property is set to -1 by default.
-
- This value should in general be less than the cluster-wide
- configuration mapred.task.limit.maxvmem. If not or if it is not set,
- TaskTracker's memory management will be disabled and a scheduler's memory
- based scheduling decisions may be affected. Please refer to the
- documentation of the configured scheduler to see how this property is used.
- </description>
-</property>
-
-<property>
<name>mapred.task.limit.maxvmem</name>
<value>-1</value>
<description>
@@ -272,23 +236,6 @@
</property>
<property>
- <name>mapred.task.maxpmem</name>name>
- <value>-1</value>
- <description>
- The maximum amount of physical memory any task of a job will use in bytes.
-
- This value may be used by schedulers that support scheduling based on job's
- memory requirements. In general, a task of this job will be scheduled on a
- TaskTracker, only if the amount of physical memory still unoccupied on the
- TaskTracker is greater than or equal to this value. Different schedulers can
- take different decisions, some might just ignore this value. Please refer to
- the documentation of the scheduler being configured to see if it does
- memory based scheduling and if it does, how this variable is used by that
- scheduler.
- </description>
-</property>
-
-<property>
<name>mapred.tasktracker.memory_calculator_plugin</name>
<value></value>
<description>
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java Tue May 26 13:30:37 2009
@@ -123,111 +123,11 @@
*/
public static final String DEFAULT_QUEUE_NAME = "default";
- /**
- * Cluster-wide configuration to be set by the administrators that provides
- * default amount of maximum virtual memory for job's tasks. This has to be
- * set on both the JobTracker node for the sake of scheduling decisions and on
- * the TaskTracker nodes for the sake of memory management.
- *
- * <p>
- *
- * If a job doesn't specify its virtual memory requirement by setting
- * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to {@link #DISABLED_MEMORY_LIMIT},
- * tasks are assured a memory limit set to this property. This property is
- * disabled by default, and if not explicitly set to a valid value by the
- * administrators and if a job doesn't specify its virtual memory
- * requirements, the job's tasks will not be assured anything and may be
- * killed by a TT that intends to control the total memory usage of the tasks
- * via memory management functionality.
- *
- * <p>
- *
- * This value should in general be less than the cluster-wide configuration
- * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} . If not or if it not set,
- * TaskTracker's memory management may be disabled and a scheduler's memory
- * based scheduling decisions will be affected. Please refer to the
- * documentation of the configured scheduler to see how this property is used.
- */
- public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
- "mapred.task.default.maxvmem";
+ static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
+ "mapred.job.map.memory.mb";
- /**
- * The maximum amount of memory any task of this job will use.
- *
- * <p>
- *
- * This value will be used by TaskTrackers for monitoring the memory usage of
- * tasks of this jobs. If a TaskTracker's memory management functionality is
- * enabled, each task of this job will be allowed to use a maximum virtual
- * memory specified by this property. If the task's memory usage goes over
- * this value, the task will be failed by the TT. If not set, the cluster-wide
- * configuration {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is used as the
- * default value for memory requirements. If this property cascaded with
- * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} becomes equal to -1, job's
- * tasks will not be assured anything and may be killed by a TT that intends
- * to control the total memory usage of the tasks via memory management
- * functionality. If the memory management functionality is disabled on a TT,
- * this value is ignored.
- *
- * <p>
- *
- * This value should also be not more than the cluster-wide configuration
- * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} which has to be set by the site
- * administrators.
- *
- * <p>
- *
- * This value may be used by schedulers that support scheduling based on job's
- * memory requirements. In general, a task of this job will be scheduled on a
- * TaskTracker only if the amount of virtual memory still unoccupied on the
- * TaskTracker is greater than or equal to this value. But different
- * schedulers can take different decisions. Please refer to the documentation
- * of the scheduler being configured to see if it does memory based scheduling
- * and if it does, how this property is used by that scheduler.
- *
- * @see #setMaxVirtualMemoryForTask(long)
- * @see #getMaxVirtualMemoryForTask()
- */
- public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
- "mapred.task.maxvmem";
-
- /**
- * The maximum amount of physical memory any task of a job will use.
- *
- * <p>
- *
- * This value may be used by schedulers that support scheduling based on job's
- * memory requirements. In general, a task of this job will be scheduled on a
- * TaskTracker, only if the amount of physical memory still unoccupied on the
- * TaskTracker is greater than or equal to this value. But different
- * schedulers can take different decisions. Please refer to the documentation
- * of the scheduler being configured to see how it does memory based
- * scheduling and how this variable is used by that scheduler.
- *
- * @see #setMaxPhysicalMemoryForTask(long)
- * @see #getMaxPhysicalMemoryForTask()
- */
- public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
- "mapred.task.maxpmem";
-
- /**
- * Cluster-wide configuration to be set by the site administrators that
- * provides an upper limit on the maximum virtual memory that can be specified
- * by a job. The job configuration {@link #MAPRED_TASK_MAXVMEM_PROPERTY} and
- * the cluster-wide configuration
- * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} should, by definition, be
- * less than this value. If the job configuration
- * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is more than this value,
- * depending on the scheduler being configured, the job may be rejected or the
- * job configuration may just be ignored.
- *
- * <p>
- *
- * If it is not set on a TaskTracker, TaskTracker's memory management will be
- * disabled.
- */
- public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
- "mapred.task.limit.maxvmem";
+ static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
+ "mapred.job.reduce.memory.mb";
/**
* Construct a map/reduce job configuration.
@@ -1491,53 +1391,23 @@
public String getJobLocalDir() {
return get("job.local.dir");
}
-
- /**
- * The maximum amount of memory any task of this job will use. See
- * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
- *
- * @return The maximum amount of memory any task of this job will use, in
- * bytes.
- * @see #setMaxVirtualMemoryForTask(long)
- */
- public long getMaxVirtualMemoryForTask() {
- return getLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+
+ long getMemoryForMapTask() {
+ return getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
+ DISABLED_MEMORY_LIMIT);
}
- /**
- * Set the maximum amount of memory any task of this job can use. See
- * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
- *
- * @param vmem Maximum amount of virtual memory in bytes any task of this job
- * can use.
- * @see #getMaxVirtualMemoryForTask()
- */
- public void setMaxVirtualMemoryForTask(long vmem) {
- setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, vmem);
+ void setMemoryForMapTask(long mem) {
+ setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
}
- /**
- * The maximum amount of physical memory any task of this job will use. See
- * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
- *
- * @return The maximum amount of physical memory any task of this job will
- * use, in bytes.
- * @see #setMaxPhysicalMemoryForTask(long)
- */
- public long getMaxPhysicalMemoryForTask() {
- return getLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+ long getMemoryForReduceTask() {
+ return getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
+ DISABLED_MEMORY_LIMIT);
}
- /**
- * Set the maximum amount of physical memory any task of this job can use. See
- * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
- *
- * @param pmem Maximum amount of physical memory in bytes any task of this job
- * can use.
- * @see #getMaxPhysicalMemoryForTask()
- */
- public void setMaxPhysicalMemoryForTask(long pmem) {
- setLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, pmem);
+ void setMemoryForReduceTask(long mem) {
+ setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
}
/**
@@ -1559,6 +1429,19 @@
set("mapred.job.queue.name", queueName);
}
+ /**
+ * Normalize the negative values in configuration
+ *
+ * @param val
+ * @return normalized value
+ */
+ public static long normalizeMemoryConfigValue(long val) {
+ if (val < 0) {
+ val = DISABLED_MEMORY_LIMIT;
+ }
+ return val;
+ }
+
/**
* Find a jar that contains a class of the same name, if any.
* It will return a jar file, even if that is not the first thing
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue May 26 13:30:37 2009
@@ -178,8 +178,6 @@
private boolean hasSpeculativeMaps;
private boolean hasSpeculativeReduces;
private long inputLength = 0;
- private long maxVirtualMemoryForTask;
- private long maxPhysicalMemoryForTask;
private Counters jobCounters = new Counters();
@@ -276,8 +274,6 @@
this.nonRunningReduces = new LinkedList<TaskInProgress>();
this.runningReduces = new LinkedHashSet<TaskInProgress>();
this.resourceEstimator = new ResourceEstimator(this);
- setMaxVirtualMemoryForTask(conf.getMaxVirtualMemoryForTask());
- setMaxPhysicalMemoryForTask(conf.getMaxPhysicalMemoryForTask());
}
/**
@@ -418,6 +414,7 @@
(numMapTasks + numReduceTasks) +
" exceeds the configured limit " + maxTasks);
}
+
jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
@@ -559,23 +556,6 @@
JobHistory.JobInfo.logJobPriority(jobId, priority);
}
- // Accessors for resources.
- long getMaxVirtualMemoryForTask() {
- return maxVirtualMemoryForTask;
- }
-
- void setMaxVirtualMemoryForTask(long maxVMem) {
- maxVirtualMemoryForTask = maxVMem;
- }
-
- long getMaxPhysicalMemoryForTask() {
- return maxPhysicalMemoryForTask;
- }
-
- void setMaxPhysicalMemoryForTask(long maxPMem) {
- maxPhysicalMemoryForTask = maxPMem;
- }
-
// Update the job start/launch time (upon restart) and log to history
synchronized void updateJobInfo(long startTime, long launchTime) {
// log and change to the job's start/launch time
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue May 26 13:30:37 2009
@@ -1530,6 +1530,11 @@
private final UserGroupInformation mrOwner;
private final String supergroup;
+ long limitMaxMemForMapTasks;
+ long limitMaxMemForReduceTasks;
+ long memSizeForMapSlotOnJT;
+ long memSizeForReduceSlotOnJT;
+
private QueueManager queueManager;
/**
@@ -1568,6 +1573,8 @@
this.conf = conf;
JobConf jobConf = new JobConf(conf);
+ initializeTaskMemoryRelatedConfig();
+
// Read the hosts/exclude files to restrict access to the jobtracker.
this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
conf.get("mapred.hosts.exclude", ""));
@@ -3040,6 +3047,15 @@
throw ioe;
}
+ // Check the job if it cannot run in the cluster because of invalid memory
+ // requirements.
+ try {
+ checkMemoryRequirements(job);
+ } catch (IOException ioe) {
+ new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
+ throw ioe;
+ }
+
return addJob(jobId, job);
}
@@ -3303,6 +3319,16 @@
}
TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
+
+ static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
+ "mapred.cluster.map.memory.mb";
+ static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
+ "mapred.cluster.reduce.memory.mb";
+
+ static final String MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY =
+ "mapred.cluster.max.map.memory.mb";
+ static final String MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY =
+ "mapred.cluster.max.reduce.memory.mb";
/*
* Returns a list of TaskCompletionEvent for the given job,
@@ -3774,4 +3800,81 @@
UserGroupInformation.getCurrentUGI().getUserName());
this.queueManager.refreshAcls(new Configuration(this.conf));
}
+
+ private void initializeTaskMemoryRelatedConfig() {
+ memSizeForMapSlotOnJT =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT));
+ memSizeForReduceSlotOnJT =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT));
+ limitMaxMemForMapTasks =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT));
+ limitMaxMemForReduceTasks =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT));
+ LOG.info(new StringBuilder().append("Scheduler configured with ").append(
+ "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append(
+ " limitMaxMemForMapTasks, limitMaxMemForReduceTasks) (").append(
+ memSizeForMapSlotOnJT).append(", ").append(memSizeForReduceSlotOnJT)
+ .append(", ").append(limitMaxMemForMapTasks).append(", ").append(
+ limitMaxMemForReduceTasks).append(")"));
+ }
+
+ private boolean perTaskMemoryConfigurationSetOnJT() {
+ if (limitMaxMemForMapTasks == JobConf.DISABLED_MEMORY_LIMIT
+ || limitMaxMemForReduceTasks == JobConf.DISABLED_MEMORY_LIMIT
+ || memSizeForMapSlotOnJT == JobConf.DISABLED_MEMORY_LIMIT
+ || memSizeForReduceSlotOnJT == JobConf.DISABLED_MEMORY_LIMIT) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Check the job if it has invalid requirements and throw and IOException if does have.
+ *
+ * @param job
+ * @throws IOException
+ */
+ private void checkMemoryRequirements(JobInProgress job)
+ throws IOException {
+ if (!perTaskMemoryConfigurationSetOnJT()) {
+ LOG.debug("Per-Task memory configuration is not set on JT. "
+ + "Not checking the job for invalid memory requirements.");
+ return;
+ }
+
+ boolean invalidJob = false;
+ String msg = "";
+ long maxMemForMapTask = job.getJobConf().getMemoryForMapTask();
+ long maxMemForReduceTask = job.getJobConf().getMemoryForReduceTask();
+
+ if (maxMemForMapTask == JobConf.DISABLED_MEMORY_LIMIT
+ || maxMemForReduceTask == JobConf.DISABLED_MEMORY_LIMIT) {
+ invalidJob = true;
+ msg = "Invalid job requirements.";
+ }
+
+ if (maxMemForMapTask > limitMaxMemForMapTasks
+ || maxMemForReduceTask > limitMaxMemForReduceTasks) {
+ invalidJob = true;
+ msg = "Exceeds the cluster's max-memory-limit.";
+ }
+
+ if (invalidJob) {
+ StringBuilder jobStr =
+ new StringBuilder().append(job.getJobID().toString()).append("(")
+ .append(maxMemForMapTask).append(" memForMapTasks ").append(
+ maxMemForReduceTask).append(" memForReduceTasks): ");
+ LOG.warn(jobStr.toString() + msg);
+
+ throw new IOException(jobStr.toString() + msg);
+ }
+ }
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Tue May 26 13:30:37 2009
@@ -59,8 +59,7 @@
tasksToBeRemoved = new ArrayList<TaskAttemptID>();
maxMemoryAllowedForAllTasks =
- taskTracker.getTotalVirtualMemoryOnTT()
- - taskTracker.getReservedVirtualMemory();
+ taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L;
monitoringInterval = taskTracker.getJobConf().getLong(
"mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
@@ -202,17 +201,6 @@
LOG.info("Memory usage of ProcessTree " + pId + " :"
+ currentMemUsage + "bytes. Limit : " + limit + "bytes");
- if (limit > taskTracker.getLimitMaxVMemPerTask()) {
- // TODO: With monitoring enabled and no scheduling based on
- // memory,users can seriously hijack the system by specifying memory
- // requirements well above the cluster wide limit. Ideally these
- // jobs should have been rejected by JT/scheduler. Because we can't
- // do that, in the minimum we should fail the tasks and hence the
- // job.
- LOG.warn("Task " + tid
- + " 's maxVmemPerTask is greater than TT's limitMaxVmPerTask");
- }
-
if (limit != JobConf.DISABLED_MEMORY_LIMIT
&& currentMemUsage > limit) {
// Task (the root process) is still alive and overflowing memory.
@@ -242,12 +230,11 @@
}
}
- LOG.debug("Memory still in usage across all tasks : " + memoryStillInUsage
- + "bytes. Total limit : " + maxMemoryAllowedForAllTasks);
-
if (memoryStillInUsage > maxMemoryAllowedForAllTasks) {
- LOG.warn("The total memory usage is still overflowing TTs limits."
- + " Trying to kill a few tasks with the least progress.");
+ LOG.warn("The total memory in usage " + memoryStillInUsage
+ + " is still overflowing TTs limits "
+ + maxMemoryAllowedForAllTasks
+ + ". Trying to kill a few tasks with the least progress.");
killTasksWithLeastProgress(memoryStillInUsage);
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Tue May 26 13:30:37 2009
@@ -368,7 +368,7 @@
vargs.add(Integer.toString(address.getPort()));
vargs.add(taskid.toString()); // pass task identifier
- tracker.addToMemoryManager(t.getTaskID(), conf);
+ tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
// set memory limit using ulimit if feasible and necessary ...
String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);