You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/05/26 15:30:38 UTC

svn commit: r778696 [1/2] - in /hadoop/core/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/ma...

Author: yhemanth
Date: Tue May 26 13:30:37 2009
New Revision: 778696

URL: http://svn.apache.org/viewvc?rev=778696&view=rev
Log:
HADOOP-5881. Simplify memory monitoring and scheduling related configuration. Contributed by Vinod Kumar Vavilapalli.

Added:
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/capacity-scheduler.xml.template
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/core/trunk/src/mapred/mapred-default.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May 26 13:30:37 2009
@@ -686,6 +686,9 @@
     HADOOP-5726. Remove pre-emption from capacity scheduler code base.
     (Rahul Kumar Singh via yhemanth)
 
+    HADOOP-5881. Simplify memory monitoring and scheduling related
+    configuration. (Vinod Kumar Vavilapalli via yhemanth)
+
   NEW FEATURES
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/capacity-scheduler.xml.template?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/conf/capacity-scheduler.xml.template (original)
+++ hadoop/core/trunk/conf/capacity-scheduler.xml.template Tue May 26 13:30:37 2009
@@ -56,34 +56,6 @@
       account in scheduling decisions by default in a job queue.
     </description>
   </property>
-
-  <property>
-    <name>mapred.capacity-scheduler.task.default-pmem-percentage-in-vmem</name>
-    <value>-1</value>
-    <description>A percentage (float) of the default VM limit for jobs
-   	  (mapred.task.default.maxvm). This is the default RAM task-limit 
-   	  associated with a task. Unless overridden by a job's setting, this 
-   	  number defines the RAM task-limit.
-
-      If this property is missing, or set to an invalid value, scheduling 
-      based on physical memory, RAM, is disabled.  
-    </description>
-  </property>
-
-  <property>
-    <name>mapred.capacity-scheduler.task.limit.maxpmem</name>
-    <value>-1</value>
-    <description>Configuration that provides an upper limit on the maximum
-      physical memory that can be specified by a job. The job configuration
-      mapred.task.maxpmem should be less than this value. If not, the job will
-      be rejected by the scheduler.
-      
-      If it is set to -1, scheduler will not consider physical memory for
-      scheduling even if virtual memory based scheduling is enabled(by setting
-      valid values for both mapred.task.default.maxvmem and
-      mapred.task.limit.maxvmem).
-    </description>
-  </property>
   
   <property>
     <name>mapred.capacity-scheduler.default-minimum-user-limit-percent</name>

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Tue May 26 13:30:37 2009
@@ -351,44 +351,4 @@
     rmConf.setInt(
         "mapred.capacity-scheduler.init-worker-threads", poolSize);
   }
-
-  /**
-   * Get the upper limit on the maximum physical memory that can be specified by
-   * a job.
-   * 
-   * @return upper limit for max pmem for tasks.
-   */
-  public long getLimitMaxPmemForTasks() {
-    return rmConf.getLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY,
-        JobConf.DISABLED_MEMORY_LIMIT);
-  }
-
-  /**
-   * Get the upper limit on the maximum physical memory that can be specified by
-   * a job.
-   * 
-   * @param value
-   */
-  public void setLimitMaxPmemForTasks(long value) {
-    rmConf.setLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY, value);
-  }
-
-  /**
-   * Get cluster-wide default percentage of pmem in vmem.
-   * 
-   * @return cluster-wide default percentage of pmem in vmem.
-   */
-  public float getDefaultPercentOfPmemInVmem() {
-    return rmConf.getFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY,
-        JobConf.DISABLED_MEMORY_LIMIT);
-  }
-
-  /**
-   * Set cluster-wide default percentage of pmem in vmem.
-   * 
-   * @param value
-   */
-  public void setDefaultPercentOfPmemInVmem(float value) {
-    rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value);
-  }
 }

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue May 26 13:30:37 2009
@@ -278,11 +278,7 @@
 
     /** our TaskScheduler object */
     protected CapacityTaskScheduler scheduler;
-    // can be replaced with a global type, if we have one
-    protected static enum TYPE {
-      MAP, REDUCE
-    }
-    protected TYPE type = null;
+    protected CapacityTaskScheduler.TYPE type = null;
 
     abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
         JobInProgress job) throws IOException; 
@@ -413,7 +409,8 @@
         //If this job meets memory requirements. Ask the JobInProgress for
         //a task to be scheduled on the task tracker.
         //if we find a job then we pass it on.
-        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
+            taskTracker)) {
           // We found a suitable job. Get task from it.
           Task t = obtainNewTask(taskTracker, j);
           //if there is a task return it immediately.
@@ -422,6 +419,8 @@
             return TaskLookupResult.getTaskFoundResult(t);
           } else {
             //skip to the next job in the queue.
+            LOG.debug("Job " + j.getJobID().toString()
+                + " returned no tasks of type " + type);
             continue;
           }
         } else {
@@ -456,7 +455,8 @@
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
-        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
+            taskTracker)) {
           // We found a suitable job. Get task from it.
           Task t = obtainNewTask(taskTracker, j);
           //if there is a task return it immediately.
@@ -561,7 +561,7 @@
   private static class MapSchedulingMgr extends TaskSchedulingMgr {
     MapSchedulingMgr(CapacityTaskScheduler dad) {
       super(dad);
-      type = TaskSchedulingMgr.TYPE.MAP;
+      type = CapacityTaskScheduler.TYPE.MAP;
       queueComparator = mapComparator;
     }
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
@@ -603,7 +603,7 @@
   private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
     ReduceSchedulingMgr(CapacityTaskScheduler dad) {
       super(dad);
-      type = TaskSchedulingMgr.TYPE.REDUCE;
+      type = CapacityTaskScheduler.TYPE.REDUCE;
       queueComparator = reduceComparator;
     }
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
@@ -664,13 +664,18 @@
       return System.currentTimeMillis();
     }
   }
+  // can be replaced with a global type, if we have one
+  protected static enum TYPE {
+    MAP, REDUCE
+  }
+
   private Clock clock;
   private JobInitializationPoller initializationPoller;
 
-  long limitMaxVmemForTasks;
-  long limitMaxPmemForTasks;
-  long defaultMaxVmPerTask;
-  float defaultPercentOfPmemInVmem;
+  private long memSizeForMapSlotOnJT;
+  private long memSizeForReduceSlotOnJT;
+  private long limitMaxMemForMapTasks;
+  private long limitMaxMemForReduceTasks;
 
   public CapacityTaskScheduler() {
     this(new Clock());
@@ -687,37 +692,45 @@
     this.schedConf = conf;
   }
 
-  /**
-   * Normalize the negative values in configuration
-   * 
-   * @param val
-   * @return normalized value
-   */
-  private long normalizeMemoryConfigValue(long val) {
-    if (val < 0) {
-      val = JobConf.DISABLED_MEMORY_LIMIT;
-    }
-    return val;
-  }
-
   private void initializeMemoryRelatedConf() {
-    limitMaxVmemForTasks =
-        normalizeMemoryConfigValue(conf.getLong(
-            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+    memSizeForMapSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    memSizeForReduceSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
             JobConf.DISABLED_MEMORY_LIMIT));
+    limitMaxMemForMapTasks =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    limitMaxMemForReduceTasks =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    LOG.info(new StringBuilder().append("Scheduler configured with ").append(
+        "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append(
+        " limitMaxMemForMapTasks, limitMaxMemForReduceTasks)").append(
+        memSizeForMapSlotOnJT).append(", ").append(memSizeForReduceSlotOnJT)
+        .append(", ").append(limitMaxMemForMapTasks).append(", ").append(
+            limitMaxMemForReduceTasks).append(")"));
+  }
 
-    limitMaxPmemForTasks =
-        normalizeMemoryConfigValue(schedConf.getLimitMaxPmemForTasks());
+  long getMemSizeForMapSlot() {
+    return memSizeForMapSlotOnJT;
+  }
 
-    defaultMaxVmPerTask =
-        normalizeMemoryConfigValue(conf.getLong(
-            JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
+  long getMemSizeForReduceSlot() {
+    return memSizeForReduceSlotOnJT;
+  }
 
-    defaultPercentOfPmemInVmem = schedConf.getDefaultPercentOfPmemInVmem();
-    if (defaultPercentOfPmemInVmem < 0) {
-      defaultPercentOfPmemInVmem = JobConf.DISABLED_MEMORY_LIMIT;
-    }
+  long getLimitMaxMemForMapSlot() {
+    return limitMaxMemForMapTasks;
+  }
+
+  long getLimitMaxMemForReduceSlot() {
+    return limitMaxMemForReduceTasks;
   }
 
   @Override
@@ -955,14 +968,12 @@
         // found a task; return
         return Collections.singletonList(tlr.getTask());
       }
-      else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT == 
-        tlr.getLookUpStatus()) {
-        // return no task
-        return null;
-      }
       // if we didn't get any, look at map tasks, if TT has space
-      else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == 
-        tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) {
+      else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
+                                  == tlr.getLookUpStatus() ||
+                TaskLookupResult.LookUpStatus.NO_TASK_FOUND
+                                  == tlr.getLookUpStatus())
+          && (maxMapTasks > currentMapTasks)) {
         mapScheduler.updateCollectionOfQSIs();
         tlr = mapScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
@@ -980,13 +991,12 @@
         // found a task; return
         return Collections.singletonList(tlr.getTask());
       }
-      else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT == 
-        tlr.getLookUpStatus()) {
-        return null;
-      }
       // if we didn't get any, look at reduce tasks, if TT has space
-      else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == 
-        tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) {
+      else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
+                                    == tlr.getLookUpStatus()
+                || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
+                                    == tlr.getLookUpStatus())
+          && (maxReduceTasks > currentReduceTasks)) {
         reduceScheduler.updateCollectionOfQSIs();
         tlr = reduceScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
@@ -999,38 +1009,6 @@
     return null;
   }
 
-  /**
-   * Kill the job if it has invalid requirements and return why it is killed
-   * 
-   * @param job
-   * @return string mentioning why the job is killed. Null if the job has valid
-   *         requirements.
-   */
-  private String killJobIfInvalidRequirements(JobInProgress job) {
-    if (!memoryMatcher.isSchedulingBasedOnVmemEnabled()) {
-      return null;
-    }
-    if ((job.getMaxVirtualMemoryForTask() > limitMaxVmemForTasks)
-        || (memoryMatcher.isSchedulingBasedOnPmemEnabled() && (job
-            .getMaxPhysicalMemoryForTask() > limitMaxPmemForTasks))) {
-      String msg =
-          job.getJobID() + " (" + job.getMaxVirtualMemoryForTask() + "vmem, "
-              + job.getMaxPhysicalMemoryForTask()
-              + "pmem) exceeds the cluster's max-memory-limits ("
-              + limitMaxVmemForTasks + "vmem, " + limitMaxPmemForTasks
-              + "pmem). Cannot run in this cluster, so killing it.";
-      LOG.warn(msg);
-      try {
-        taskTrackerManager.killJob(job.getJobID());
-        return msg;
-      } catch (IOException ioe) {
-        LOG.warn("Failed to kill the job " + job.getJobID() + ". Reason : "
-            + StringUtils.stringifyException(ioe));
-      }
-    }
-    return null;
-  }
-
   // called when a job is added
   synchronized void jobAdded(JobInProgress job) throws IOException {
     QueueSchedulingInfo qsi = 
@@ -1050,13 +1028,6 @@
     qsi.numJobsByUser.put(job.getProfile().getUser(), i);
     LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
               + job.getProfile().getUser() + ", user now has " + i + " jobs");
-
-    // Kill the job if it cannot run in the cluster because of invalid
-    // resource requirements.
-    String statusMsg = killJobIfInvalidRequirements(job);
-    if (statusMsg != null) {
-      throw new IOException(statusMsg);
-    }
   }
 
   // called when a job completes

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java Tue May 26 13:30:37 2009
@@ -30,111 +30,33 @@
     this.scheduler = capacityTaskScheduler;
   }
 
-  boolean isSchedulingBasedOnVmemEnabled() {
-    LOG.debug("defaultMaxVmPerTask : " + scheduler.defaultMaxVmPerTask
-        + " limitMaxVmemForTasks : " + scheduler.limitMaxVmemForTasks);
-    if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT
-        || scheduler.limitMaxVmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
+  boolean isSchedulingBasedOnMemEnabled() {
+    if (scheduler.getLimitMaxMemForMapSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT
+        || scheduler.getLimitMaxMemForReduceSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT
+        || scheduler.getMemSizeForMapSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT
+        || scheduler.getMemSizeForReduceSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT) {
       return false;
     }
     return true;
   }
 
-  boolean isSchedulingBasedOnPmemEnabled() {
-    LOG.debug("defaultPercentOfPmemInVmem : "
-        + scheduler.defaultPercentOfPmemInVmem + " limitMaxPmemForTasks : "
-        + scheduler.limitMaxPmemForTasks);
-    if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT
-        || scheduler.limitMaxPmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Obtain the virtual memory allocated for a job's tasks.
-   * 
-   * If the job has a configured value for the max-virtual memory, that will be
-   * returned. Else, the cluster-wide default max-virtual memory for tasks is
-   * returned.
-   * 
-   * This method can only be called after
-   * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
-   * 
-   * @param jConf JobConf of the job
-   * @return the virtual memory allocated for the job's tasks.
-   */
-  private long getVirtualMemoryForTask(JobConf jConf) {
-    long vMemForTask = jConf.getMaxVirtualMemoryForTask();
-    if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      vMemForTask =
-          new JobConf().getLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-              scheduler.defaultMaxVmPerTask);
-    }
-    return vMemForTask;
-  }
-
-  /**
-   * Obtain the physical memory allocated for a job's tasks.
-   * 
-   * If the job has a configured value for the max physical memory, that
-   * will be returned. Else, the cluster-wide default physical memory for
-   * tasks is returned.
-   * 
-   * This method can only be called after
-   * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
-   * 
-   * @param jConf JobConf of the job
-   * @return the physical memory allocated for the job's tasks
-   */
-  private long getPhysicalMemoryForTask(JobConf jConf) {
-    long pMemForTask = jConf.getMaxPhysicalMemoryForTask();
-    if (pMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      pMemForTask =
-          Math.round(getVirtualMemoryForTask(jConf)
-              * scheduler.defaultPercentOfPmemInVmem);
-    }
-    return pMemForTask;
-  }
-
-  static class Memory {
-    long vmem;
-    long pmem;
-
-    Memory(long vm, long pm) {
-      this.vmem = vm;
-      this.pmem = pm;
-    }
-  }
-
   /**
    * Find the memory that is already used by all the running tasks
    * residing on the given TaskTracker.
    * 
    * @param taskTracker
+   * @param taskType 
    * @return amount of memory that is used by the residing tasks,
    *          null if memory cannot be computed for some reason.
    */
-  private synchronized Memory getMemReservedForTasks(
-      TaskTrackerStatus taskTracker) {
-    boolean disabledVmem = false;
-    boolean disabledPmem = false;
-
-    if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      disabledVmem = true;
-    }
-
-    if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT) {
-      disabledPmem = true;
-    }
-
-    if (disabledVmem && disabledPmem) {
-      return new Memory(JobConf.DISABLED_MEMORY_LIMIT,
-          JobConf.DISABLED_MEMORY_LIMIT);
-    }
-
+  private synchronized Long getMemReservedForTasks(
+      TaskTrackerStatus taskTracker, CapacityTaskScheduler.TYPE taskType) {
     long vmem = 0;
-    long pmem = 0;
+    long myVmem = 0;
 
     for (TaskStatus task : taskTracker.getTaskReports()) {
       // the following task states are one in which the slot is
@@ -142,12 +64,12 @@
       // accounted in used memory.
       if ((task.getRunState() == TaskStatus.State.RUNNING)
           || (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
-        JobInProgress job = scheduler.taskTrackerManager.getJob(
-                                              task.getTaskID().getJobID());
+        JobInProgress job =
+            scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID());
         if (job == null) {
           // This scenario can happen if a job was completed/killed
-          // and retired from JT's memory. In this state, we can ignore 
-          // the running task status and compute memory for the rest of 
+          // and retired from JT's memory. In this state, we can ignore
+          // the running task status and compute memory for the rest of
           // the tasks. However, any scheduling done with this computation
           // could result in over-subscribing of memory for tasks on this
           // TT (as the unaccounted for task is still running).
@@ -155,123 +77,98 @@
           // One of the ways of doing that is to return null from here
           // and check for null in the calling method.
           LOG.info("Task tracker: " + taskTracker.getHost() + " is reporting "
-                    + "a running / commit pending task: " + task.getTaskID()
-                    + " but no corresponding job was found. "
-                    + "Maybe job was retired. Not computing "
-                    + "memory values for this TT.");
+              + "a running / commit pending task: " + task.getTaskID()
+              + " but no corresponding job was found. "
+              + "Maybe job was retired. Not computing "
+              + "memory values for this TT.");
           return null;
         }
-        
-        JobConf jConf =
-            scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID())
-                .getJobConf();
-        if (!disabledVmem) {
-          vmem += getVirtualMemoryForTask(jConf);
-        }
-        if (!disabledPmem) {
-          pmem += getPhysicalMemoryForTask(jConf);
+
+        JobConf jConf = job.getJobConf();
+
+        // Get the memory "allotted" for this task by rounding off the job's
+        // tasks' memory limits to the nearest multiple of the slot-memory-size
+        // set on JT. This essentially translates to tasks of a high memory job
+        // using multiple slots.
+        if (task.getIsMap() && taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
+          myVmem = jConf.getMemoryForMapTask();
+          myVmem =
+              (long) (scheduler.getMemSizeForMapSlot() * Math
+                  .ceil((float) myVmem
+                      / (float) scheduler.getMemSizeForMapSlot()));
+        } else if (!task.getIsMap()
+            && taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+          myVmem = jConf.getMemoryForReduceTask();
+          myVmem =
+              (long) (scheduler.getMemSizeForReduceSlot() * Math
+                  .ceil((float) myVmem
+                      / (float) scheduler.getMemSizeForReduceSlot()));
         }
+        vmem += myVmem;
       }
     }
 
-    return new Memory(vmem, pmem);
+    return Long.valueOf(vmem);
   }
 
   /**
-   * Check if a TT has enough pmem and vmem to run this job.
+   * Check if a TT has enough memory to run of task specified from this job.
    * @param job
+   * @param taskType 
    * @param taskTracker
    * @return true if this TT has enough memory for this job. False otherwise.
    */
   boolean matchesMemoryRequirements(JobInProgress job,
-      TaskTrackerStatus taskTracker) {
+      CapacityTaskScheduler.TYPE taskType, TaskTrackerStatus taskTracker) {
 
-    // ////////////// vmem based scheduling
-    if (!isSchedulingBasedOnVmemEnabled()) {
-      LOG.debug("One of the configuration parameters defaultMaxVmPerTask "
-          + "and limitMaxVmemPerTasks is not configured. Scheduling based "
-          + "on job's memory requirements is disabled, ignoring any value "
-          + "set by job.");
-      return true;
-    }
-
-    TaskTrackerStatus.ResourceStatus resourceStatus =
-        taskTracker.getResourceStatus();
-    long totalVMemOnTT = resourceStatus.getTotalVirtualMemory();
-    long reservedVMemOnTT = resourceStatus.getReservedTotalMemory();
-
-    if (totalVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT
-        || reservedVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
-      return true;
-    }
+    LOG.debug("Matching memory requirements of " + job.getJobID().toString()
+        + " for scheduling on " + taskTracker.trackerName);
 
-    if (reservedVMemOnTT > totalVMemOnTT) {
+    if (!isSchedulingBasedOnMemEnabled()) {
+      LOG.debug("Scheduling based on job's memory requirements is disabled."
+          + " Ignoring any value set by job.");
       return true;
     }
 
-    long jobVMemForTask = job.getMaxVirtualMemoryForTask();
-    if (jobVMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      jobVMemForTask = scheduler.defaultMaxVmPerTask;
-    }
-
-    Memory memReservedForTasks = getMemReservedForTasks(taskTracker);
-    if (memReservedForTasks == null) {
+    Long memUsedOnTT = getMemReservedForTasks(taskTracker, taskType);
+    if (memUsedOnTT == null) {
       // For some reason, maybe because we could not find the job
       // corresponding to a running task (as can happen if the job
       // is retired in between), we could not compute the memory state
       // on this TT. Treat this as an error, and fail memory
       // requirements.
-      LOG.info("Could not compute memory for taskTracker: " 
-                + taskTracker.getHost() + ". Failing memory requirements.");
+      LOG.info("Could not compute memory for taskTracker: "
+          + taskTracker.getHost() + ". Failing memory requirements.");
       return false;
     }
-    long vmemUsedOnTT = memReservedForTasks.vmem;
-    long pmemUsedOnTT = memReservedForTasks.pmem;
 
-    long freeVmemUsedOnTT = totalVMemOnTT - vmemUsedOnTT - reservedVMemOnTT;
+    long totalMemUsableOnTT = 0;
 
-    if (jobVMemForTask > freeVmemUsedOnTT) {
+    long memForThisTask = 0;
+    if (taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
+      memForThisTask = job.getJobConf().getMemoryForMapTask();
+      totalMemUsableOnTT =
+          scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapTasks();
+    } else if (taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+      memForThisTask = job.getJobConf().getMemoryForReduceTask();
+      totalMemUsableOnTT =
+          scheduler.getMemSizeForReduceSlot()
+              * taskTracker.getMaxReduceTasks();
+    }
+
+    long freeMemOnTT = totalMemUsableOnTT - memUsedOnTT.longValue();
+    if (memForThisTask > freeMemOnTT) {
+      LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT ("
+          + freeMemOnTT + "). A " + taskType + " task from "
+          + job.getJobID().toString() + " cannot be scheduled on TT "
+          + taskTracker.trackerName);
       return false;
     }
 
-    // ////////////// pmem based scheduling
-
-    long totalPmemOnTT = resourceStatus.getTotalPhysicalMemory();
-    long reservedPmemOnTT = resourceStatus.getReservedPhysicalMemory();
-    long jobPMemForTask = job.getMaxPhysicalMemoryForTask();
-    long freePmemUsedOnTT = 0;
-
-    if (isSchedulingBasedOnPmemEnabled()) {
-      if (totalPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT
-          || reservedPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
-        return true;
-      }
-
-      if (reservedPmemOnTT > totalPmemOnTT) {
-        return true;
-      }
-
-      if (jobPMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-        jobPMemForTask =
-            Math.round(jobVMemForTask * scheduler.defaultPercentOfPmemInVmem);
-      }
-
-      freePmemUsedOnTT = totalPmemOnTT - pmemUsedOnTT - reservedPmemOnTT;
-
-      if (jobPMemForTask > freePmemUsedOnTT) {
-        return false;
-      }
-    } else {
-      LOG.debug("One of the configuration parameters "
-          + "defaultPercentOfPmemInVmem and limitMaxPmemPerTasks is not "
-          + "configured. Scheduling based on job's physical memory "
-          + "requirements is disabled, ignoring any value set by job.");
-    }
-
-    LOG.debug("freeVMemOnTT = " + freeVmemUsedOnTT + " totalVMemOnTT = "
-        + totalVMemOnTT + " freePMemOnTT = " + freePmemUsedOnTT
-        + " totalPMemOnTT = " + totalPmemOnTT + " jobVMemForTask = "
-        + jobVMemForTask + " jobPMemForTask = " + jobPMemForTask);
+    LOG.debug("memForThisTask = " + memForThisTask + ". freeMemOnTT = "
+        + freeMemOnTT + ". A " + taskType.toString() + " task from "
+        + job.getJobID().toString() + " matches memory requirements on TT "
+        + taskTracker.trackerName);
     return true;
   }
 }

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue May 26 13:30:37 2009
@@ -182,8 +182,6 @@
       }
       mapTaskCtr = 0;
       redTaskCtr = 0;
-      super.setMaxVirtualMemoryForTask(jobConf.getMaxVirtualMemoryForTask());
-      super.setMaxPhysicalMemoryForTask(jobConf.getMaxPhysicalMemoryForTask());
     }
     
     @Override
@@ -232,7 +230,7 @@
       }
       return task;
     }
-    
+
     @Override
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
@@ -733,7 +731,7 @@
   private FakeJobInProgress submitJob(int state, JobConf jobConf) throws IOException {
     FakeJobInProgress job =
         new FakeJobInProgress(new JobID("test", ++jobCounter),
-            (jobConf == null ? new JobConf() : jobConf), taskTrackerManager,
+            (jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager,
             jobConf.getUser());
     job.getStatus().setRunState(state);
     taskTrackerManager.submitJob(job);
@@ -1504,12 +1502,6 @@
     LOG.debug("Starting the scheduler.");
     taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
 
-    // Limited TT - 1GB vmem and 512MB pmem
-    taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
-        .setTotalVirtualMemory(1 * 1024 * 1024 * 1024L);
-    taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
-        .setTotalPhysicalMemory(512 * 1024 * 1024L);
-
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -1519,11 +1511,11 @@
     // memory-based scheduling disabled by default.
     scheduler.start();
 
-    LOG.debug("Submit one high memory(3GB vmem, 1GBpmem) job of 1 map task "
-        + "and 1 reduce task.");
+    LOG.debug("Submit one high memory job of 1 3GB map task "
+        + "and 1 1GB reduce task.");
     JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
-    jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
+    jConf.setMemoryForMapTask(3 * 1024L); // 3GB
+    jConf.setMemoryForReduceTask(1 * 1024L); // 1 GB
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(1);
     jConf.setQueueName("default");
@@ -1538,197 +1530,57 @@
   }
 
   /**
-   * Test to verify that highPmemJobs are scheduled like all other jobs when
-   * physical-memory based scheduling is not enabled.
-   * @throws IOException
-   */
-  public void testDisabledPmemBasedScheduling()
-      throws IOException {
-
-    LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-
-    // Limited TT - 100GB vmem and 500MB pmem
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(100 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(500 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
-
-    taskTrackerManager.addQueues(new String[] { "default" });
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.setTaskTrackerManager(taskTrackerManager);
-    // enable vmem-based scheduling. pmem based scheduling disabled by default.
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1536 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        3 * 1024 * 1024 * 1024L);
-    scheduler.start();
-
-    LOG.debug("Submit one high pmem(3GB vmem, 1GBpmem) job of 1 map task "
-        + "and 1 reduce task.");
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
-    jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    submitJobAndInit(JobStatus.RUNNING, jConf);
-
-    // assert that all tasks are launched even though they transgress the
-    // scheduling limits.
-
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-  }
-
-  /**
-   * Test HighMemoryJobs.
-   * @throws IOException
-   */
-  public void testHighMemoryJobs()
-      throws IOException {
-
-    LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
-    // Normal job on this TT would be 1.5GB vmem, 0.5GB pmem
-
-    taskTrackerManager.addQueues(new String[] { "default" });
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setTaskTrackerManager(taskTrackerManager);
-    // enabled memory-based scheduling
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1536 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        3 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-
-    LOG.debug("Submit one high memory(1600MB vmem, 400MB pmem) job of "
-        + "1 map task and 1 reduce task.");
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(1600 * 1024 * 1024L); // 1.6GB vmem
-    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    jConf.setMapSpeculativeExecution(false);
-    jConf.setReduceSpeculativeExecution(false);
-    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-
-    // No more tasks of this job can run on the TT because of lack of vmem
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-
-    // Let attempt_test_0001_m_000001_0 finish, task assignment should succeed.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-
-    LOG.debug("Submit another high memory(1200MB vmem, 800MB pmem) job of "
-        + "1 map task and 0 reduces.");
-    jConf.setMaxVirtualMemoryForTask(1200 * 1024 * 1024L);
-    jConf.setMaxPhysicalMemoryForTask(800 * 1024 * 1024L);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(0);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    jConf.setMapSpeculativeExecution(false);
-    jConf.setReduceSpeculativeExecution(false);
-    submitJobAndInit(JobStatus.PREP, jConf); // job2
-
-    // This job shouldn't run the TT now because of lack of pmem
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-
-    // Let attempt_test_0001_m_000002_0 finish, task assignment should succeed.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job1);
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-
-    LOG.debug("Submit a normal memory(200MB vmem, 100MB pmem) job of "
-        + "0 maps and 1 reduce task.");
-    jConf.setMaxVirtualMemoryForTask(200 * 1024 * 1024L);
-    jConf.setMaxPhysicalMemoryForTask(100 * 1024 * 1024L);
-    jConf.setNumMapTasks(0);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    submitJobAndInit(JobStatus.PREP, jConf); // job3
-
-    checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
-  }
-
-  /**
-   * Test HADOOP-4979. 
-   * Bug fix for making sure we always return null to TT if there is a 
-   * high-mem job, and not look at reduce jobs (if map tasks are high-mem)
-   * or vice-versa.
+   * Test reverting HADOOP-4979. If there is a high-mem job, we should now look
+   * at reduce jobs (if map tasks are high-mem) or vice-versa.
+   * 
    * @throws IOException
    */
-  public void testHighMemoryBlocking()
+  public void testHighMemoryBlockingAcrossTaskTypes()
       throws IOException {
 
     // 2 map and 1 reduce slots
     taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
 
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
-    // Normal job on this TT would be 1GB vmem, 0.5GB pmem
-
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1 * 1024 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        3 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
+    // Normal job in the cluster would be 1GB maps/reduces
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
-    // We need a situation where the scheduler needs to run a map task, 
-    // but the available one has a high-mem requirement. There should
-    // be another job whose maps or reduces can run, but they shouldn't 
-    // be scheduled.
+    // The situation : Two jobs in the queue. First job with only maps and no
+    // reduces and is a high memory job. Second job is a normal job with both maps and reduces.
+    // First job cannot run for want of memory for maps. In this case, second job's reduces should run.
     
-    LOG.debug("Submit one high memory(2GB vmem, 400MB pmem) job of "
+    LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
         + "2 map tasks");
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
-    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(0);
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
-    LOG.debug("Submit another regular memory(900MB vmem, 200MB pmem) job of "
+
+    LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
         + "2 map/red tasks");
-    jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(900 * 1024 * 1024L); // 900MB vmem
-    jConf.setMaxPhysicalMemoryForTask(200 * 1024 * 1024L); // 200MB pmem
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(2);
     jConf.setQueueName("default");
@@ -1738,75 +1590,8 @@
     // first, a map from j1 will run
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // at this point, the scheduler tries to schedule another map from j1. 
-    // there isn't enough space. There is space to run the second job's
-    // map or reduce task, but they shouldn't be scheduled
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-  }
-  
-  /**
-   * test invalid highMemoryJobs
-   * @throws IOException
-   */
-  public void testHighMemoryJobWithInvalidRequirements()
-      throws IOException {
-    LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024);
-    ttStatus.setReservedPhysicalMemory(0);
-
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    taskTrackerManager.addQueues(new String[] { "default" });
-    resConf.setFakeQueues(queues);
-    scheduler.setTaskTrackerManager(taskTrackerManager);
-    // enabled memory-based scheduling
-    long vmemUpperLimit = 1 * 1024 * 1024 * 1024L;
-    long vmemDefault = 1536 * 1024 * 1024L;
-    long pmemUpperLimit = vmemUpperLimit;
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        vmemDefault);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        vmemUpperLimit);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(pmemUpperLimit);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-
-    LOG.debug("Submit one invalid high ram(5GB vmem, 3GB pmem) job of "
-        + "1 map, 0 reduce tasks.");
-    long jobMaxVmem = 5 * 1024 * 1024 * 1024L;
-    long jobMaxPmem = 3 * 1024 * 1024 * 1024L;
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(jobMaxVmem);
-    jConf.setMaxPhysicalMemoryForTask(jobMaxPmem);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(0);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-
-    boolean throwsException = false;
-    String msg = null;
-    FakeJobInProgress job;
-    try {
-      job = submitJob(JobStatus.PREP, jConf);
-    } catch (IOException ioe) {
-      // job has to fail
-      throwsException = true;
-      msg = ioe.getMessage();
-    }
-
-    assertTrue(throwsException);
-    job = (FakeJobInProgress) taskTrackerManager.getJobs().toArray()[0];
-    assertTrue(msg.matches(job.getJobID() + " \\(" + jobMaxVmem + "vmem, "
-        + jobMaxPmem + "pmem\\) exceeds the cluster's max-memory-limits \\("
-        + vmemUpperLimit + "vmem, " + pmemUpperLimit
-        + "pmem\\). Cannot run in this cluster, so killing it."));
-    // For job, no cleanup task needed so gets killed immediately.
-    assertTrue(job.getStatus().getRunState() == JobStatus.KILLED);
+    // there isn't enough space. The second job's reduce should be scheduled.
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
   }
 
   /**
@@ -1817,13 +1602,7 @@
       throws IOException {
 
     LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
+    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
 
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -1831,68 +1610,65 @@
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1536 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        4 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(2 * 1024 * 1024 * 1024L);
+    // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
-    LOG.debug("Submit one high memory(4GB vmem, 512MB pmem) job of "
+    LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
         + "1 map, 0 reduce tasks.");
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(4 * 1024 * 1024 * 1024L);
-    jConf.setMaxPhysicalMemoryForTask(512 * 1024 * 1024L);
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
     jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(0);
+    jConf.setNumReduceTasks(1);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
-    // TTs should not run these jobs i.e. cluster blocked because of lack of
-    // vmem
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt1")));
 
-    // Job should still be alive
-    assertTrue(job1.getStatus().getRunState() == JobStatus.RUNNING);
-
-    LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
-    // Use cluster-wide defaults
-    jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
-    jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
+    // Fill the second tt with this job.
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+
+    LOG.debug("Submit one high memory(2GB maps/reduces) job of "
+        + "2 map, 2 reduce tasks.");
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(2 * 1024);
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(2);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    // cluster should still be blocked for job1 and so even job2 should not run
-    // even though it is a normal job
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-
-    scheduler.taskTrackerManager.killJob(job2.getJobID());
-    scheduler.taskTrackerManager.killJob(job1.getJobID());
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
 
-    LOG.debug("Submit one high memory(2GB vmem, 2GB pmem) job of "
+    LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
         + "1 map, 0 reduce tasks.");
-    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L);
-    jConf.setMaxPhysicalMemoryForTask(2 * 1024 * 1024 * 1024L);
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
     FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
-    // TTs should not run these jobs i.e. cluster blocked because of lack of
-    // pmem now.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    
-    // Job should still be alive
-    assertTrue(job3.getStatus().getRunState() == JobStatus.RUNNING);
 
-    LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
-    // Use cluster-wide defaults
-    jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
-    jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
-    submitJobAndInit(JobStatus.PREP, jConf); // job4
-
-    // cluster should still be blocked for job3 and so even job4 should not run
-    // even though it is a normal job
+    // Job2 cannot fit on tt2 or tt1. Blocking. Job3 also will not run.
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
   }
 
   /**
@@ -1906,13 +1682,6 @@
     // create a cluster with a single node.
     LOG.debug("Starting cluster with 1 tasktracker, 2 map and 2 reduce slots");
     taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    LOG.debug("Assume TT has 4 GB virtual mem and 2 GB RAM");
-    ttStatus.setTotalVirtualMemory(4 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(2 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
 
     // create scheduler
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -1921,14 +1690,17 @@
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
-    LOG.debug("By default, jobs get 0.5 GB per task vmem" +
-        " and 2 GB max vmem, with 50% of it for RAM");
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        512 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        2 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(50.0f);
-    resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
+    LOG.debug("Assume TT has 2GB for maps and 2GB for reduces");
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 512);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 512);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
     
@@ -1937,6 +1709,8 @@
     JobConf jConf = new JobConf();
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(2);
+    jConf.setMemoryForMapTask(512);
+    jConf.setMemoryForReduceTask(512);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
@@ -1955,6 +1729,8 @@
     jConf = new JobConf();
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(1);
+    jConf.setMemoryForMapTask(512);
+    jConf.setMemoryForReduceTask(512);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
@@ -2338,21 +2114,8 @@
    */
   public void testHighRamJobWithSpeculativeExecution() throws IOException {
     // 2 map and 2 reduce slots
-    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
-    
-    //task tracker memory configurations.
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
-    ttStatus = taskTrackerManager.getTaskTracker("tt2").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
-   
+    taskTrackerManager = new FakeTaskTrackerManager(2, 3, 3);
+    // 1GB for each map, 1GB for each reduce
 
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -2360,19 +2123,23 @@
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1 * 1024 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        3 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        3 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        3 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024L);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
-    
+    //Submit a high memory job with speculative tasks.
     JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
-    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(0);
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
@@ -2381,20 +2148,18 @@
     jConf.setReduceSpeculativeExecution(false);
     FakeJobInProgress job1 = new FakeJobInProgress(new JobID("test", ++jobCounter),
           jConf, taskTrackerManager,"u1");
-    
-    //Submit a high memory job with speculative tasks.
     taskTrackerManager.submitJob(job1);
-    
+
+    //Submit normal job
     jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem
-    jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(0);
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     jConf.setMapSpeculativeExecution(false);
     jConf.setReduceSpeculativeExecution(false);
-    //Submit normal job
     FakeJobInProgress job2 = submitJob(JobStatus.PREP, jConf);
 
     controlledInitializationPoller.selectJobsToInitialize();
@@ -2429,8 +2194,8 @@
     
     //Now submit high ram job with speculative reduce and check.
     jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
-    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(2 * 1024L);
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(1);
     jConf.setQueueName("default");
@@ -2439,76 +2204,43 @@
     jConf.setReduceSpeculativeExecution(true);
     FakeJobInProgress job3 = new FakeJobInProgress(new JobID("test", ++jobCounter),
           jConf, taskTrackerManager,"u1");
-    
-    //Submit a high memory job with speculative reduce tasks.
     taskTrackerManager.submitJob(job3);
-    
+
+    //Submit normal job w.r.t reduces
     jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem
-    jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem
+    jConf.setMemoryForMapTask(2 * 1024L);
+    jConf.setMemoryForReduceTask(1 * 104L);
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(1);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     jConf.setMapSpeculativeExecution(false);
     jConf.setReduceSpeculativeExecution(false);
-    //Submit normal job
     FakeJobInProgress job4 = submitJob(JobStatus.PREP, jConf);
     
     controlledInitializationPoller.selectJobsToInitialize();
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
-    //all maps of jobs get assigned to same task tracker as
-    //job does not have speculative map and same tracker sends two heart
-    //beat back to back.
+
+    // Finish up the map scheduler
     checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
-    //first map slot gets attention on this tracker.
     checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
-    //now first reduce of the job3 would be scheduled on tt2 since it has
-    //memory.
-    //assigntasks() would check for free reduce slot is greater than
-    //map slots. Seeing there is more free reduce slot it would try scheduling
-    //reduce of job1 but would block as in it is a high memory task.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    //TT2 would get the reduce task from high memory job as the tt is running
-    //normal jobs map. which is low mem.
-    checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
-    // now if either TT comes back, it will block because all maps
-    // are done, and the first jobs reduce has a speculative task.
+
+    // first, a reduce from j3 will run
+    // at this point, there is a speculative task for the same job to be
+    //scheduled. This task would be scheduled. Till the tasks from job3 gets
+    //complete none of the tasks from other jobs would be scheduled.
+    checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
+    assertEquals("pending reduces greater than zero " , job3.pendingMaps(), 0);
+    //make same tracker get back, check if you are blocking. Your job
+    //has speculative reduce task so tracker should be blocked even tho' it
+    //can run job4's reduce.
     assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt2"))); 
-    //finish maps.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0003_m_000001_0", 
-        job3);
-    taskTrackerManager.finishTask("tt2", "attempt_test_0004_m_000001_0", 
-        job4);
-    //check speculative reduce code path is covered.
-    assertEquals("Pending reduces not zero for high " +
-    		"ram job with speculative reduce.", 0, job3.pendingReduces());
-    //if tt2 returns back it is not given any task even if it can schedule
-    //job2 reduce.
-    assertNull(scheduler.assignTasks(tracker("tt2")));
-    //speculative reduce of the job3 would be scheduled.
-    checkAssignment("tt1", "attempt_test_0003_r_000001_1 on tt1");
-    //now both speculative and actual task have been scheduled for job3.
-    //Normal task of Job4 would now be scheduled on TT1 as it has free space
-    //to run.
+    //TT2 now gets speculative map of the job1
+    checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
+
+    // Now since j3 has no more speculative reduces, it can schedule
+    // the j4.
     checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
-    //No more tasks.
-    assertNull(scheduler.assignTasks(tracker("tt2")));
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    
-    //finish all the reduces.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0003_r_000001_1", 
-        job3);
-    taskTrackerManager.finishTask("tt2", "attempt_test_0003_r_000001_0", 
-        job3);
-    //finish the job
-    taskTrackerManager.finalizeJob(job3);
-    //finish the task and the job.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0004_r_000001_0", 
-        job4);
-    taskTrackerManager.finalizeJob(job4);
-    
   }
 
   private void checkRunningJobMovementAndCompletion() throws IOException {

Modified: hadoop/core/trunk/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/mapred-default.xml?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/mapred-default.xml (original)
+++ hadoop/core/trunk/src/mapred/mapred-default.xml Tue May 26 13:30:37 2009
@@ -186,42 +186,6 @@
 </property>
 
 <property>
-  <name>mapred.tasktracker.pmem.reserved</name>
-  <value>-1</value>
-  <description>Configuration property to specify the amount of physical memory
-    that has to be reserved by the TaskTracker for system usage (OS, TT etc).
-    The reserved physical memory should be a part of the total physical memory
-    available on the TaskTracker.
-
-    The reserved physical memory and the total physical memory values are
-    reported by the TaskTracker as part of heart-beat so that they can
-    considered by a scheduler. Please refer to the documentation of the
-    configured scheduler to see how this property is used.
-  </description>
-</property>
-
-<property>
-  <name>mapred.task.default.maxvmem</name>
-  <value>-1</value>
-  <description>
-    Cluster-wide configuration in bytes to be set by the administrators that
-    provides default amount of maximum virtual memory for job's tasks. This has
-    to be set on both the JobTracker node for the sake of scheduling decisions
-    and on the TaskTracker nodes for the sake of memory management.
-
-    If a job doesn't specify its virtual memory requirement by setting
-    mapred.task.maxvmem to -1, tasks are assured a memory limit set
-    to this property. This property is set to -1 by default.
-
-    This value should in general be less than the cluster-wide
-    configuration mapred.task.limit.maxvmem. If not or if it is not set,
-    TaskTracker's memory management will be disabled and a scheduler's memory
-    based scheduling decisions may be affected. Please refer to the
-    documentation of the configured scheduler to see how this property is used.
-  </description>
-</property>
-
-<property>
   <name>mapred.task.limit.maxvmem</name>
   <value>-1</value>
   <description>
@@ -272,23 +236,6 @@
 </property>
 
 <property>
-  <name>mapred.task.maxpmem</name>name>
-  <value>-1</value>
-  <description>
-   The maximum amount of physical memory any task of a job will use in bytes.
-
-   This value may be used by schedulers that support scheduling based on job's
-   memory requirements. In general, a task of this job will be scheduled on a
-   TaskTracker, only if the amount of physical memory still unoccupied on the
-   TaskTracker is greater than or equal to this value. Different schedulers can
-   take different decisions, some might just ignore this value. Please refer to
-   the documentation of the scheduler being configured to see if it does
-   memory based scheduling and if it does, how this variable is used by that
-   scheduler.
-  </description>
-</property>
-
-<property>
   <name>mapred.tasktracker.memory_calculator_plugin</name>
   <value></value>
   <description>

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java Tue May 26 13:30:37 2009
@@ -123,111 +123,11 @@
    */
   public static final String DEFAULT_QUEUE_NAME = "default";
 
-  /**
-   * Cluster-wide configuration to be set by the administrators that provides
-   * default amount of maximum virtual memory for job's tasks. This has to be
-   * set on both the JobTracker node for the sake of scheduling decisions and on
-   * the TaskTracker nodes for the sake of memory management.
-   * 
-   * <p>
-   * 
-   * If a job doesn't specify its virtual memory requirement by setting
-   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to {@link #DISABLED_MEMORY_LIMIT},
-   * tasks are assured a memory limit set to this property. This property is
-   * disabled by default, and if not explicitly set to a valid value by the
-   * administrators and if a job doesn't specify its virtual memory
-   * requirements, the job's tasks will not be assured anything and may be
-   * killed by a TT that intends to control the total memory usage of the tasks
-   * via memory management functionality.
-   * 
-   * <p>
-   * 
-   * This value should in general be less than the cluster-wide configuration
-   * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} . If not or if it not set,
-   * TaskTracker's memory management may be disabled and a scheduler's memory
-   * based scheduling decisions will be affected. Please refer to the
-   * documentation of the configured scheduler to see how this property is used.
-   */
-  public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
-      "mapred.task.default.maxvmem";
+  static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
+      "mapred.job.map.memory.mb";
 
-  /**
-   * The maximum amount of memory any task of this job will use.
-   * 
-   * <p>
-   * 
-   * This value will be used by TaskTrackers for monitoring the memory usage of
-   * tasks of this jobs. If a TaskTracker's memory management functionality is
-   * enabled, each task of this job will be allowed to use a maximum virtual
-   * memory specified by this property. If the task's memory usage goes over
-   * this value, the task will be failed by the TT. If not set, the cluster-wide
-   * configuration {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is used as the
-   * default value for memory requirements. If this property cascaded with
-   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} becomes equal to -1, job's
-   * tasks will not be assured anything and may be killed by a TT that intends
-   * to control the total memory usage of the tasks via memory management
-   * functionality. If the memory management functionality is disabled on a TT,
-   * this value is ignored.
-   * 
-   * <p>
-   * 
-   * This value should also be not more than the cluster-wide configuration
-   * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} which has to be set by the site
-   * administrators.
-   * 
-   * <p>
-   * 
-   * This value may be used by schedulers that support scheduling based on job's
-   * memory requirements. In general, a task of this job will be scheduled on a
-   * TaskTracker only if the amount of virtual memory still unoccupied on the
-   * TaskTracker is greater than or equal to this value. But different
-   * schedulers can take different decisions. Please refer to the documentation
-   * of the scheduler being configured to see if it does memory based scheduling
-   * and if it does, how this property is used by that scheduler.
-   * 
-   * @see #setMaxVirtualMemoryForTask(long)
-   * @see #getMaxVirtualMemoryForTask()
-   */
-  public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
-      "mapred.task.maxvmem";
-
-  /**
-   * The maximum amount of physical memory any task of a job will use.
-   * 
-   * <p>
-   * 
-   * This value may be used by schedulers that support scheduling based on job's
-   * memory requirements. In general, a task of this job will be scheduled on a
-   * TaskTracker, only if the amount of physical memory still unoccupied on the
-   * TaskTracker is greater than or equal to this value. But different
-   * schedulers can take different decisions. Please refer to the documentation
-   * of the scheduler being configured to see how it does memory based
-   * scheduling and how this variable is used by that scheduler.
-   * 
-   * @see #setMaxPhysicalMemoryForTask(long)
-   * @see #getMaxPhysicalMemoryForTask()
-   */
-  public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
-      "mapred.task.maxpmem";
-
-  /**
-   * Cluster-wide configuration to be set by the site administrators that
-   * provides an upper limit on the maximum virtual memory that can be specified
-   * by a job. The job configuration {@link #MAPRED_TASK_MAXVMEM_PROPERTY} and
-   * the cluster-wide configuration
-   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} should, by definition, be
-   * less than this value. If the job configuration
-   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is more than this value,
-   * depending on the scheduler being configured, the job may be rejected or the
-   * job configuration may just be ignored.
-   * 
-   * <p>
-   * 
-   * If it is not set on a TaskTracker, TaskTracker's memory management will be
-   * disabled.
-   */
-  public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
-      "mapred.task.limit.maxvmem";
+  static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
+      "mapred.job.reduce.memory.mb";
 
   /**
    * Construct a map/reduce job configuration.
@@ -1491,53 +1391,23 @@
   public String getJobLocalDir() {
     return get("job.local.dir");
   }
-  
-  /**
-   * The maximum amount of memory any task of this job will use. See
-   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
-   * 
-   * @return The maximum amount of memory any task of this job will use, in
-   *         bytes.
-   * @see #setMaxVirtualMemoryForTask(long)
-   */
-  public long getMaxVirtualMemoryForTask() {
-    return getLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+
+  long getMemoryForMapTask() {
+    return getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
+        DISABLED_MEMORY_LIMIT);
   }
 
-  /**
-   * Set the maximum amount of memory any task of this job can use. See
-   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
-   * 
-   * @param vmem Maximum amount of virtual memory in bytes any task of this job
-   *          can use.
-   * @see #getMaxVirtualMemoryForTask()
-   */
-  public void setMaxVirtualMemoryForTask(long vmem) {
-    setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, vmem);
+  void setMemoryForMapTask(long mem) {
+    setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
   }
 
-  /**
-   * The maximum amount of physical memory any task of this job will use. See
-   * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
-   * 
-   * @return The maximum amount of physical memory any task of this job will
-   *         use, in bytes.
-   * @see #setMaxPhysicalMemoryForTask(long)
-   */
-  public long getMaxPhysicalMemoryForTask() {
-    return getLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+  long getMemoryForReduceTask() {
+    return getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
+        DISABLED_MEMORY_LIMIT);
   }
 
-  /**
-   * Set the maximum amount of physical memory any task of this job can use. See
-   * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
-   * 
-   * @param pmem Maximum amount of physical memory in bytes any task of this job
-   *          can use.
-   * @see #getMaxPhysicalMemoryForTask()
-   */
-  public void setMaxPhysicalMemoryForTask(long pmem) {
-    setLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, pmem);
+  void setMemoryForReduceTask(long mem) {
+    setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
   }
 
   /**
@@ -1559,6 +1429,19 @@
     set("mapred.job.queue.name", queueName);
   }
   
+  /**
+   * Normalize the negative values in configuration
+   * 
+   * @param val
+   * @return normalized value
+   */
+  public static long normalizeMemoryConfigValue(long val) {
+    if (val < 0) {
+      val = DISABLED_MEMORY_LIMIT;
+    }
+    return val;
+  }
+
   /** 
    * Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue May 26 13:30:37 2009
@@ -178,8 +178,6 @@
   private boolean hasSpeculativeMaps;
   private boolean hasSpeculativeReduces;
   private long inputLength = 0;
-  private long maxVirtualMemoryForTask;
-  private long maxPhysicalMemoryForTask;
   
   private Counters jobCounters = new Counters();
   
@@ -276,8 +274,6 @@
     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
-    setMaxVirtualMemoryForTask(conf.getMaxVirtualMemoryForTask());
-    setMaxPhysicalMemoryForTask(conf.getMaxPhysicalMemoryForTask());
   }
 
   /**
@@ -418,6 +414,7 @@
                 (numMapTasks + numReduceTasks) +
                 " exceeds the configured limit " + maxTasks);
     }
+
     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
     
@@ -559,23 +556,6 @@
     JobHistory.JobInfo.logJobPriority(jobId, priority);
   }
 
-  // Accessors for resources.
-  long getMaxVirtualMemoryForTask() {
-    return maxVirtualMemoryForTask;
-  }
-
-  void setMaxVirtualMemoryForTask(long maxVMem) {
-    maxVirtualMemoryForTask = maxVMem;
-  }
-
-  long getMaxPhysicalMemoryForTask() {
-    return maxPhysicalMemoryForTask;
-  }
-
-  void setMaxPhysicalMemoryForTask(long maxPMem) {
-    maxPhysicalMemoryForTask = maxPMem;
-  }
-
   // Update the job start/launch time (upon restart) and log to history
   synchronized void updateJobInfo(long startTime, long launchTime) {
     // log and change to the job's start/launch time

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue May 26 13:30:37 2009
@@ -1530,6 +1530,11 @@
   private final UserGroupInformation mrOwner;
   private final String supergroup;
 
+  long limitMaxMemForMapTasks;
+  long limitMaxMemForReduceTasks;
+  long memSizeForMapSlotOnJT;
+  long memSizeForReduceSlotOnJT;
+
   private QueueManager queueManager;
 
   /**
@@ -1568,6 +1573,8 @@
     this.conf = conf;
     JobConf jobConf = new JobConf(conf);
 
+    initializeTaskMemoryRelatedConfig();
+
     // Read the hosts/exclude files to restrict access to the jobtracker.
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
                                            conf.get("mapred.hosts.exclude", ""));
@@ -3040,6 +3047,15 @@
       throw ioe;
     }
 
+    // Check the job if it cannot run in the cluster because of invalid memory
+    // requirements.
+    try {
+      checkMemoryRequirements(job);
+    } catch (IOException ioe) {
+      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
+      throw ioe;
+    }
+
    return addJob(jobId, job); 
   }
 
@@ -3303,6 +3319,16 @@
   }
   
   TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
+
+  static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
+      "mapred.cluster.map.memory.mb";
+  static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
+      "mapred.cluster.reduce.memory.mb";
+
+  static final String MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY =
+      "mapred.cluster.max.map.memory.mb";
+  static final String MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY =
+      "mapred.cluster.max.reduce.memory.mb";
   
   /* 
    * Returns a list of TaskCompletionEvent for the given job, 
@@ -3774,4 +3800,81 @@
         UserGroupInformation.getCurrentUGI().getUserName());
     this.queueManager.refreshAcls(new Configuration(this.conf));
   }
+
+  private void initializeTaskMemoryRelatedConfig() {
+    memSizeForMapSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    memSizeForReduceSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    limitMaxMemForMapTasks =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    limitMaxMemForReduceTasks =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    LOG.info(new StringBuilder().append("Scheduler configured with ").append(
+        "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append(
+        " limitMaxMemForMapTasks, limitMaxMemForReduceTasks) (").append(
+        memSizeForMapSlotOnJT).append(", ").append(memSizeForReduceSlotOnJT)
+        .append(", ").append(limitMaxMemForMapTasks).append(", ").append(
+            limitMaxMemForReduceTasks).append(")"));
+  }
+
+  private boolean perTaskMemoryConfigurationSetOnJT() {
+    if (limitMaxMemForMapTasks == JobConf.DISABLED_MEMORY_LIMIT
+        || limitMaxMemForReduceTasks == JobConf.DISABLED_MEMORY_LIMIT
+        || memSizeForMapSlotOnJT == JobConf.DISABLED_MEMORY_LIMIT
+        || memSizeForReduceSlotOnJT == JobConf.DISABLED_MEMORY_LIMIT) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Check the job if it has invalid requirements and throw and IOException if does have.
+   * 
+   * @param job
+   * @throws IOException 
+   */
+  private void checkMemoryRequirements(JobInProgress job)
+      throws IOException {
+    if (!perTaskMemoryConfigurationSetOnJT()) {
+      LOG.debug("Per-Task memory configuration is not set on JT. "
+          + "Not checking the job for invalid memory requirements.");
+      return;
+    }
+
+    boolean invalidJob = false;
+    String msg = "";
+    long maxMemForMapTask = job.getJobConf().getMemoryForMapTask();
+    long maxMemForReduceTask = job.getJobConf().getMemoryForReduceTask();
+
+    if (maxMemForMapTask == JobConf.DISABLED_MEMORY_LIMIT
+        || maxMemForReduceTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      invalidJob = true;
+      msg = "Invalid job requirements.";
+    }
+
+    if (maxMemForMapTask > limitMaxMemForMapTasks
+        || maxMemForReduceTask > limitMaxMemForReduceTasks) {
+      invalidJob = true;
+      msg = "Exceeds the cluster's max-memory-limit.";
+    }
+
+    if (invalidJob) {
+      StringBuilder jobStr =
+          new StringBuilder().append(job.getJobID().toString()).append("(")
+              .append(maxMemForMapTask).append(" memForMapTasks ").append(
+                  maxMemForReduceTask).append(" memForReduceTasks): ");
+      LOG.warn(jobStr.toString() + msg);
+
+      throw new IOException(jobStr.toString() + msg);
+    }
+  }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Tue May 26 13:30:37 2009
@@ -59,8 +59,7 @@
     tasksToBeRemoved = new ArrayList<TaskAttemptID>();
 
     maxMemoryAllowedForAllTasks =
-        taskTracker.getTotalVirtualMemoryOnTT()
-            - taskTracker.getReservedVirtualMemory();
+        taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L;
 
     monitoringInterval = taskTracker.getJobConf().getLong(
         "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
@@ -202,17 +201,6 @@
           LOG.info("Memory usage of ProcessTree " + pId + " :"
               + currentMemUsage + "bytes. Limit : " + limit + "bytes");
 
-          if (limit > taskTracker.getLimitMaxVMemPerTask()) {
-            // TODO: With monitoring enabled and no scheduling based on
-            // memory,users can seriously hijack the system by specifying memory
-            // requirements well above the cluster wide limit. Ideally these
-            // jobs should have been rejected by JT/scheduler. Because we can't
-            // do that, in the minimum we should fail the tasks and hence the
-            // job.
-            LOG.warn("Task " + tid
-                + " 's maxVmemPerTask is greater than TT's limitMaxVmPerTask");
-          }
-
           if (limit != JobConf.DISABLED_MEMORY_LIMIT
               && currentMemUsage > limit) {
             // Task (the root process) is still alive and overflowing memory.
@@ -242,12 +230,11 @@
         }
       }
 
-      LOG.debug("Memory still in usage across all tasks : " + memoryStillInUsage
-          + "bytes. Total limit : " + maxMemoryAllowedForAllTasks);
-
       if (memoryStillInUsage > maxMemoryAllowedForAllTasks) {
-        LOG.warn("The total memory usage is still overflowing TTs limits."
-            + " Trying to kill a few tasks with the least progress.");
+        LOG.warn("The total memory in usage " + memoryStillInUsage
+            + " is still overflowing TTs limits "
+            + maxMemoryAllowedForAllTasks
+            + ". Trying to kill a few tasks with the least progress.");
         killTasksWithLeastProgress(memoryStillInUsage);
       }
     

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Tue May 26 13:30:37 2009
@@ -368,7 +368,7 @@
       vargs.add(Integer.toString(address.getPort())); 
       vargs.add(taskid.toString());                      // pass task identifier
 
-      tracker.addToMemoryManager(t.getTaskID(), conf);
+      tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
 
       // set memory limit using ulimit if feasible and necessary ...
       String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);