You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/05/26 15:30:38 UTC

svn commit: r778696 [2/2] - in /hadoop/core/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/ma...

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue May 26 13:30:37 2009
@@ -200,58 +200,10 @@
   private TaskMemoryManagerThread taskMemoryManager;
   private boolean taskMemoryManagerEnabled = true;
   private long totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
-  private long totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
-  private long reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
-  private long reservedPmem = JobConf.DISABLED_MEMORY_LIMIT;
-
-  // Cluster wide default value for max-vm per task
-  private long defaultMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
-  // Cluster wide upper limit on max-vm per task
-  private long limitMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
-
-  /**
-   * Configuration property to specify the amount of virtual memory that has to
-   * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
-   * virtual memory should be a part of the total virtual memory available on
-   * the TaskTracker. TaskTracker obtains the total virtual memory available on
-   * the system by using a {@link MemoryCalculatorPlugin}. The total physical
-   * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
-   * MemoryCalculatorPlugin implementation.
-   * 
-   * <p>
-   * 
-   * The reserved virtual memory and the total virtual memory values are
-   * reported by the TaskTracker as part of heart-beat so that they can
-   * considered by a scheduler.
-   * 
-   * <p>
-   * 
-   * These two values are also used by the TaskTracker for tracking tasks'
-   * memory usage. Memory management functionality on a TaskTracker is disabled
-   * if this property is not set, if it more than the total virtual memory
-   * reported by MemoryCalculatorPlugin, or if either of the values is negative.
-   */
-  static final String MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY =
-      "mapred.tasktracker.vmem.reserved";
-
-  /**
-   * Configuration property to specify the amount of physical memory that has to
-   * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
-   * physical memory should be a part of the total physical memory available on
-   * the TaskTracker. TaskTracker obtains the total physical memory available on
-   * the system by using a {@link MemoryCalculatorPlugin}. The total physical
-   * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
-   * MemoryCalculatorPlugin implementation.
-   * 
-   * <p>
-   * 
-   * The reserved virtual memory and the total virtual memory values are
-   * reported by the TaskTracker as part of heart-beat so that they can
-   * considered by a scheduler.
-   * 
-   */
-  static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
-      "mapred.tasktracker.pmem.reserved";
+  private long totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
 
   static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
       "mapred.tasktracker.memory_calculator_plugin";
@@ -1247,14 +1199,14 @@
       long freeDiskSpace = getFreeSpace();
       long totVmem = getTotalVirtualMemoryOnTT();
       long totPmem = getTotalPhysicalMemoryOnTT();
-      long rsrvdVmem = getReservedVirtualMemory();
-      long rsrvdPmem = getReservedPhysicalMemory();
 
       status.getResourceStatus().setAvailableSpace(freeDiskSpace);
       status.getResourceStatus().setTotalVirtualMemory(totVmem);
       status.getResourceStatus().setTotalPhysicalMemory(totPmem);
-      status.getResourceStatus().setReservedVirtualMemory(rsrvdVmem);
-      status.getResourceStatus().setReservedPhysicalMemory(rsrvdPmem);
+      status.getResourceStatus().setMapSlotMemorySizeOnTT(
+          mapSlotMemorySizeOnTT);
+      status.getResourceStatus().setReduceSlotMemorySizeOnTT(
+          reduceSlotSizeMemoryOnTT);
     }
       
     //
@@ -1317,53 +1269,11 @@
    * @return total size of physical memory.
    */
   long getTotalPhysicalMemoryOnTT() {
-    return totalPmemOnTT;
-  }
-
-  /**
-   * Return the amount of virtual memory reserved on the TaskTracker for system
-   * usage (OS, TT etc).
-   */
-  long getReservedVirtualMemory() {
-    return reservedVirtualMemory;
-  }
-
-  /**
-   * Return the amount of physical memory reserved on the TaskTracker for system
-   * usage (OS, TT etc).
-   */
-  long getReservedPhysicalMemory() {
-    return reservedPmem;
+    return totalPhysicalMemoryOnTT;
   }
 
-  /**
-   * Return the limit on the maxVMemPerTask on this TaskTracker
-   * @return limitMaxVmPerTask
-   */
-  long getLimitMaxVMemPerTask() {
-    return limitMaxVmPerTask;
-  }
-
-  /**
-   * Obtain the virtual memory allocated for a TIP.
-   * 
-   * If the TIP's job has a configured value for the max-virtual memory, that
-   * will be returned. Else, the cluster-wide default maxvirtual memory for
-   * tasks is returned.
-   * 
-   * @param conf
-   * @return the virtual memory allocated for the TIP.
-   */
-  long getVirtualMemoryForTask(JobConf conf) {
-    long vMemForTask =
-        normalizeMemoryConfigValue(conf.getMaxVirtualMemoryForTask());
-    if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      vMemForTask =
-          normalizeMemoryConfigValue(fConf.getLong(
-              JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-              JobConf.DISABLED_MEMORY_LIMIT));
-    }
-    return vMemForTask;
+  long getTotalMemoryAllottedForTasksOnTT() {
+    return totalMemoryAllottedForTasks;
   }
 
   /**
@@ -1637,7 +1547,6 @@
 
   private TaskLauncher mapLauncher;
   private TaskLauncher reduceLauncher;
-      
   public JvmManager getJvmManagerInstance() {
     return jvmManager;
   }
@@ -1775,10 +1684,12 @@
     }
   }
   
-  void addToMemoryManager(TaskAttemptID attemptId, 
+  void addToMemoryManager(TaskAttemptID attemptId, boolean isMap, 
                           JobConf conf) {
     if (isTaskMemoryManagerEnabled()) {
-      taskMemoryManager.addTask(attemptId, getVirtualMemoryForTask(conf));
+      taskMemoryManager.addTask(attemptId, isMap ? conf
+          .getMemoryForMapTask() * 1024 * 1024L : conf
+          .getMemoryForReduceTask() * 1024 * 1024L);
     }
   }
 
@@ -3119,33 +3030,35 @@
             + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
         totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       }
-      totalPmemOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
-      if (totalPmemOnTT <= 0) {
+      totalPhysicalMemoryOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
+      if (totalPhysicalMemoryOnTT <= 0) {
         LOG.warn("TaskTracker's totalPmem could not be calculated. "
             + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
-        totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+        totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       }
     }
 
-    reservedVirtualMemory =
-        normalizeMemoryConfigValue(fConf.getLong(
-            TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-
-    reservedPmem =
-        normalizeMemoryConfigValue(fConf.getLong(
-            TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-
-    defaultMaxVmPerTask =
-        normalizeMemoryConfigValue(fConf.getLong(
-            JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-
-    limitMaxVmPerTask =
-        normalizeMemoryConfigValue(fConf.getLong(
-            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
+    mapSlotMemorySizeOnTT =
+        fConf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT);
+    reduceSlotSizeMemoryOnTT =
+        fConf.getLong(
+            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT);
+    totalMemoryAllottedForTasks =
+        maxCurrentMapTasks * mapSlotMemorySizeOnTT + maxCurrentReduceTasks
+            * reduceSlotSizeMemoryOnTT;
+    if (totalMemoryAllottedForTasks < 0) {
+      totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
+    }
+    if (totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT) {
+      LOG.info("totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT."
+          + " Thrashing might happen.");
+    } else if (totalMemoryAllottedForTasks > totalVirtualMemoryOnTT) {
+      LOG.info("totalMemoryAllottedForTasks > totalVirtualMemoryOnTT."
+          + " Thrashing might happen.");
+    }
 
     // start the taskMemoryManager thread only if enabled
     setTaskMemoryManagerEnabledFlag();
@@ -3164,55 +3077,12 @@
       return;
     }
 
-    // /// Missing configuration
-    StringBuilder mesg = new StringBuilder();
-
-    long totalVmemOnTT = getTotalVirtualMemoryOnTT();
-    if (totalVmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
-      mesg.append("TaskTracker's totalVmem could not be calculated.\n");
+    if (totalMemoryAllottedForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
       taskMemoryManagerEnabled = false;
-    }
-
-    long reservedVmem = getReservedVirtualMemory();
-    if (reservedVmem == JobConf.DISABLED_MEMORY_LIMIT) {
-      mesg.append("TaskTracker's reservedVmem is not configured.\n");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      mesg.append("TaskTracker's defaultMaxVmPerTask is not configured.\n");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (limitMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      mesg.append("TaskTracker's limitMaxVmPerTask is not configured.\n");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (!taskMemoryManagerEnabled) {
-      LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
-      return;
-    }
-    // ///// End of missing configuration
-
-    // ///// Mis-configuration
-    if (defaultMaxVmPerTask > limitMaxVmPerTask) {
-      mesg.append("defaultMaxVmPerTask is mis-configured. "
-          + "It shouldn't be greater than limitMaxVmPerTask. ");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (reservedVmem > totalVmemOnTT) {
-      mesg.append("reservedVmemOnTT is mis-configured. "
-          + "It shouldn't be greater than totalVmemOnTT");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (!taskMemoryManagerEnabled) {
-      LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
+      LOG.warn("TaskTracker's totalMemoryAllottedForTasks is -1."
+          + " TaskMemoryManager is disabled.");
       return;
     }
-    // ///// End of mis-configuration
 
     taskMemoryManagerEnabled = true;
   }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue May 26 13:30:37 2009
@@ -55,16 +55,16 @@
   static class ResourceStatus implements Writable {
     
     private long totalVirtualMemory;
-    private long reservedVirtualMemory;
     private long totalPhysicalMemory;
-    private long reservedPhysicalMemory;
+    private long mapSlotMemorySizeOnTT;
+    private long reduceSlotMemorySizeOnTT;
     private long availableSpace;
     
     ResourceStatus() {
       totalVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
-      reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
       totalPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
-      reservedPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
+      mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+      reduceSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       availableSpace = Long.MAX_VALUE;
     }
 
@@ -90,24 +90,6 @@
     }
 
     /**
-     * Set the amount of virtual memory reserved on the TaskTracker for system
-     * usage (OS, TT etc).
-     * 
-     * @param reservedVmem amount of virtual memory reserved in bytes.
-     */
-    void setReservedVirtualMemory(long reservedVmem) {
-      reservedVirtualMemory = reservedVmem;
-    }
-
-    /**
-     * Get the amount of virtual memory reserved on the TaskTracker for system
-     * usage (OS, TT etc).
-     */
-    long getReservedTotalMemory() {
-      return reservedVirtualMemory;
-    }
-
-    /**
      * Set the maximum amount of physical memory on the tasktracker.
      * 
      * @param totalRAM maximum amount of physical memory on the tasktracker in
@@ -130,23 +112,49 @@
     }
 
     /**
-     * Set the amount of physical memory reserved on the TaskTracker for system
-     * usage (OS, TT etc).
+     * Set the memory size of each map slot on this TT. This will be used by JT
+     * for accounting more slots for jobs that use more memory.
+     * 
+     * @param mem
+     */
+    void setMapSlotMemorySizeOnTT(long mem) {
+      mapSlotMemorySizeOnTT = mem;
+    }
+
+    /**
+     * Get the memory size of each map slot on this TT. See
+     * {@link #setMapSlotMemorySizeOnTT(long)}
      * 
-     * @param reservedPmem amount of physical memory reserved in bytes.
+     * @return
      */
-    void setReservedPhysicalMemory(long reservedPmem) {
-      reservedPhysicalMemory = reservedPmem;
+    long getMapSlotMemorySizeOnTT() {
+      return mapSlotMemorySizeOnTT;
     }
 
     /**
-     * Get the amount of physical memory reserved on the TaskTracker for system
-     * usage (OS, TT etc).
+     * Set the memory size of each reduce slot on this TT. This will be used by
+     * JT for accounting more slots for jobs that use more memory.
+     * 
+     * @param mem
      */
-    long getReservedPhysicalMemory() {
-      return reservedPhysicalMemory;
+    void setReduceSlotMemorySizeOnTT(long mem) {
+      reduceSlotMemorySizeOnTT = mem;
     }
 
+    /**
+     * Get the memory size of each reduce slot on this TT. See
+     * {@link #setReduceSlotMemorySizeOnTT(long)}
+     * 
+     * @return
+     */
+    long getReduceSlotMemorySizeOnTT() {
+      return reduceSlotMemorySizeOnTT;
+    }
+
+    /**
+     * Set the available disk space on the TT
+     * @param availSpace
+     */
     void setAvailableSpace(long availSpace) {
       availableSpace = availSpace;
     }
@@ -161,17 +169,17 @@
     
     public void write(DataOutput out) throws IOException {
       WritableUtils.writeVLong(out, totalVirtualMemory);
-      WritableUtils.writeVLong(out, reservedVirtualMemory);
       WritableUtils.writeVLong(out, totalPhysicalMemory);
-      WritableUtils.writeVLong(out, reservedPhysicalMemory);
+      WritableUtils.writeVLong(out, mapSlotMemorySizeOnTT);
+      WritableUtils.writeVLong(out, reduceSlotMemorySizeOnTT);
       WritableUtils.writeVLong(out, availableSpace);
     }
     
     public void readFields(DataInput in) throws IOException {
       totalVirtualMemory = WritableUtils.readVLong(in);
-      reservedVirtualMemory = WritableUtils.readVLong(in);
       totalPhysicalMemory = WritableUtils.readVLong(in);
-      reservedPhysicalMemory = WritableUtils.readVLong(in);
+      mapSlotMemorySizeOnTT = WritableUtils.readVLong(in);
+      reduceSlotMemorySizeOnTT = WritableUtils.readVLong(in);
       availableSpace = WritableUtils.readVLong(in);
     }
   }

Added: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java?rev=778696&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java (added)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java Tue May 26 13:30:37 2009
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.ToolRunner;
+
+import junit.framework.TestCase;
+
+public class TestSubmitJob extends TestCase {
+  private MiniMRCluster miniMRCluster;
+
+  @Override
+  protected void tearDown()
+      throws Exception {
+    if (miniMRCluster != null) {
+      miniMRCluster.shutdown();
+    }
+  }
+
+  /**
+   * Test to verify that jobs with invalid memory requirements are killed at the
+   * JT.
+   * 
+   * @throws Exception
+   */
+  public void testJobWithInvalidMemoryReqs()
+      throws Exception {
+    JobConf jtConf = new JobConf();
+    jtConf
+        .setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L);
+    jtConf.setLong(JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024L);
+    jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        3 * 1024L);
+    jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        4 * 1024L);
+
+    miniMRCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
+
+    JobConf clusterConf = miniMRCluster.createJobConf();
+
+    // No map-memory configuration
+    JobConf jobConf = new JobConf(clusterConf);
+    jobConf.setMemoryForReduceTask(1 * 1024L);
+    runJobAndVerifyFailure(jobConf, JobConf.DISABLED_MEMORY_LIMIT, 1 * 1024L,
+        "Invalid job requirements.");
+
+    // No reduce-memory configuration
+    jobConf = new JobConf(clusterConf);
+    jobConf.setMemoryForMapTask(1 * 1024L);
+    runJobAndVerifyFailure(jobConf, 1 * 1024L, JobConf.DISABLED_MEMORY_LIMIT,
+        "Invalid job requirements.");
+
+    // Invalid map-memory configuration
+    jobConf = new JobConf(clusterConf);
+    jobConf.setMemoryForMapTask(4 * 1024L);
+    jobConf.setMemoryForReduceTask(1 * 1024L);
+    runJobAndVerifyFailure(jobConf, 4 * 1024L, 1 * 1024L,
+        "Exceeds the cluster's max-memory-limit.");
+
+    // No reduce-memory configuration
+    jobConf = new JobConf(clusterConf);
+    jobConf.setMemoryForMapTask(1 * 1024L);
+    jobConf.setMemoryForReduceTask(5 * 1024L);
+    runJobAndVerifyFailure(jobConf, 1 * 1024L, 5 * 1024L,
+        "Exceeds the cluster's max-memory-limit.");
+  }
+
+  private void runJobAndVerifyFailure(JobConf jobConf, long memForMapTasks,
+      long memForReduceTasks, String expectedMsg)
+      throws Exception,
+      IOException {
+    String[] args = { "-m", "0", "-r", "0", "-mt", "0", "-rt", "0" };
+    boolean throwsException = false;
+    String msg = null;
+    try {
+      ToolRunner.run(jobConf, new SleepJob(), args);
+    } catch (RemoteException re) {
+      throwsException = true;
+      msg = re.unwrapRemoteException().getMessage();
+    }
+    assertTrue(throwsException);
+    assertNotNull(msg);
+
+    String overallExpectedMsg =
+        "(" + memForMapTasks + " memForMapTasks " + memForReduceTasks
+            + " memForReduceTasks): " + expectedMsg;
+    assertTrue("Observed message - " + msg
+        + " - doesn't contain expected message - " + overallExpectedMsg, msg
+        .contains(overallExpectedMsg));
+  }
+}

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java Tue May 26 13:30:37 2009
@@ -22,10 +22,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.SleepJob;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ToolRunner;
@@ -46,7 +43,6 @@
 
   static final Log LOG = LogFactory.getLog(TestTTMemoryReporting.class);
   
-  private MiniDFSCluster miniDFSCluster;
   private MiniMRCluster miniMRCluster;
 
   /**
@@ -77,41 +73,42 @@
           getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
       long totalPhysicalMemoryOnTT =
           getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
-      long virtualMemoryReservedOnTT =
-          getConf().getLong("reservedVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
-      long physicalMemoryReservedOnTT =
-          getConf().getLong("reservedPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
+      long mapSlotMemorySize =
+          getConf().getLong("mapSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
+      long reduceSlotMemorySize =
+          getConf()
+              .getLong("reduceSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
 
       long reportedTotalVirtualMemoryOnTT =
           status.getResourceStatus().getTotalVirtualMemory();
       long reportedTotalPhysicalMemoryOnTT =
           status.getResourceStatus().getTotalPhysicalMemory();
-      long reportedVirtualMemoryReservedOnTT =
-          status.getResourceStatus().getReservedTotalMemory();
-      long reportedPhysicalMemoryReservedOnTT =
-          status.getResourceStatus().getReservedPhysicalMemory();
+      long reportedMapSlotMemorySize =
+          status.getResourceStatus().getMapSlotMemorySizeOnTT();
+      long reportedReduceSlotMemorySize =
+          status.getResourceStatus().getReduceSlotMemorySizeOnTT();
 
       message =
           "expected memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
-              + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
-              + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ", "
-              + virtualMemoryReservedOnTT + ", " + physicalMemoryReservedOnTT
-              + ")";
+              + "mapSlotMemSize, reduceSlotMemorySize) = ("
+              + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ","
+              + mapSlotMemorySize + "," + reduceSlotMemorySize + ")";
       message +=
           "\nreported memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
-              + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
+              + "reportedMapSlotMemorySize, reportedReduceSlotMemorySize) = ("
               + reportedTotalVirtualMemoryOnTT
               + ", "
               + reportedTotalPhysicalMemoryOnTT
-              + ", "
-              + reportedVirtualMemoryReservedOnTT
-              + ", "
-              + reportedPhysicalMemoryReservedOnTT + ")";
+              + ","
+              + reportedMapSlotMemorySize
+              + ","
+              + reportedReduceSlotMemorySize
+              + ")";
       LOG.info(message);
       if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT
           || totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT
-          || virtualMemoryReservedOnTT != reportedVirtualMemoryReservedOnTT
-          || physicalMemoryReservedOnTT != reportedPhysicalMemoryReservedOnTT) {
+          || mapSlotMemorySize != reportedMapSlotMemorySize
+          || reduceSlotMemorySize != reportedReduceSlotMemorySize) {
         hasPassed = false;
       }
       return super.assignTasks(status);
@@ -132,7 +129,7 @@
           TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
           DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
       setUpCluster(conf);
-      runSleepJob();
+      runSleepJob(miniMRCluster.createJobConf());
       verifyTestResults();
     } finally {
       tearDownCluster();
@@ -149,8 +146,9 @@
     JobConf conf = new JobConf();
     conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L);
     conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L);
-    conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
-    conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
+    conf.setLong("mapSlotMemorySize", 1 * 512L);
+    conf.setLong("reduceSlotMemorySize", 1 * 1024L);
+
     conf.setClass(
         TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
         DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
@@ -158,15 +156,17 @@
         4 * 1024 * 1024 * 1024L);
     conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
         2 * 1024 * 1024 * 1024L);
+    conf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+        512L);
     conf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
-        1 * 1024 * 1024 * 1024L);
-    conf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
-        512 * 1024 * 1024L);
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1024L);
+    
     try {
       setUpCluster(conf);
-      runSleepJob();
+      JobConf jobConf = miniMRCluster.createJobConf();
+      jobConf.setMemoryForMapTask(1 * 1024L);
+      jobConf.setMemoryForReduceTask(2 * 1024L);
+      runSleepJob(jobConf);
       verifyTestResults();
     } finally {
       tearDownCluster();
@@ -189,17 +189,10 @@
     LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
     conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize());
     conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize());
-    conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
-    conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
-    conf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
-        1 * 1024 * 1024 * 1024L);
-    conf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
-        512 * 1024 * 1024L);
+
     try {
       setUpCluster(conf);
-      runSleepJob();
+      runSleepJob(miniMRCluster.createJobConf());
       verifyTestResults();
     } finally {
       tearDownCluster();
@@ -208,22 +201,15 @@
 
   private void setUpCluster(JobConf conf)
                                 throws Exception {
-    conf.setClass("mapred.jobtracker.taskScheduler", 
-        TestTTMemoryReporting.FakeTaskScheduler.class,
-        TaskScheduler.class);
-    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
-    FileSystem fileSys = miniDFSCluster.getFileSystem();
-    String namenode = fileSys.getUri().toString();
-    miniMRCluster = new MiniMRCluster(1, namenode, 3, 
-                      null, null, conf);    
+    conf.setClass("mapred.jobtracker.taskScheduler",
+        TestTTMemoryReporting.FakeTaskScheduler.class, TaskScheduler.class);
+    conf.set("mapred.job.tracker.handler.count", "1");
+    miniMRCluster = new MiniMRCluster(1, "file:///", 3, null, null, conf);
   }
   
-  private void runSleepJob() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set("mapred.job.tracker", "localhost:"
-                              + miniMRCluster.getJobTrackerPort());
+  private void runSleepJob(JobConf conf) throws Exception {
     String[] args = { "-m", "1", "-r", "1",
-                      "-mt", "1000", "-rt", "1000" };
+                      "-mt", "10", "-rt", "10" };
     ToolRunner.run(conf, new SleepJob(), args);
   }
 
@@ -235,7 +221,8 @@
   }
   
   private void tearDownCluster() {
-    if (miniMRCluster != null) { miniMRCluster.shutdown(); }
-    if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
+    if (miniMRCluster != null) {
+      miniMRCluster.shutdown();
+    }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=778696&r1=778695&r2=778696&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Tue May 26 13:30:37 2009
@@ -19,20 +19,19 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.regex.Pattern;
 import java.util.regex.Matcher;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.fs.FileSystem;
 
 import junit.framework.TestCase;
 
@@ -43,18 +42,19 @@
 
   private static final Log LOG =
       LogFactory.getLog(TestTaskTrackerMemoryManager.class);
-  private MiniDFSCluster miniDFSCluster;
   private MiniMRCluster miniMRCluster;
 
   private String taskOverLimitPatternString =
       "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. "
           + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
 
-  private void startCluster(JobConf conf) throws Exception {
-    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
-    FileSystem fileSys = miniDFSCluster.getFileSystem();
-    String namenode = fileSys.getUri().toString();
-    miniMRCluster = new MiniMRCluster(1, namenode, 1, null, null, conf);
+  private void startCluster(JobConf conf)
+      throws Exception {
+    conf.set("mapred.job.tracker.handler.count", "1");
+    conf.set("mapred.tasktracker.map.tasks.maximum", "1");
+    conf.set("mapred.tasktracker.reduce.tasks.maximum", "1");
+    conf.set("mapred.tasktracker.tasks.sleeptime-before-sigkill", "0");
+    miniMRCluster = new MiniMRCluster(1, "file:///", 1, null, null, conf);
   }
 
   @Override
@@ -62,9 +62,6 @@
     if (miniMRCluster != null) {
       miniMRCluster.shutdown();
     }
-    if (miniDFSCluster != null) {
-      miniDFSCluster.shutdown();
-    }
   }
 
   private int runSleepJob(JobConf conf) throws Exception {
@@ -74,15 +71,6 @@
 
   private void runAndCheckSuccessfulJob(JobConf conf)
       throws IOException {
-    // Set up job.
-    JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
-    conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
-        + jt.getTrackerPort());
-    NameNode nn = miniDFSCluster.getNameNode();
-    conf.set("fs.default.name", "hdfs://"
-        + nn.getNameNodeAddress().getHostName() + ":"
-        + nn.getNameNodeAddress().getPort());
-
     Pattern taskOverLimitPattern =
         Pattern.compile(String.format(taskOverLimitPatternString, "[0-9]*"));
     Matcher mat = null;
@@ -148,43 +136,12 @@
       return;
     }
 
-    JobConf conf = new JobConf();
     // Task-memory management disabled by default.
-    startCluster(conf);
-    long PER_TASK_LIMIT = 100L; // Doesn't matter how low.
-    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
-    runAndCheckSuccessfulJob(conf);
-  }
-
-  /**
-   * Test for verifying that tasks with no limits, with the cumulative usage
-   * still under TT's limits, succeed.
-   * 
-   * @throws Exception
-   */
-  public void testTasksWithNoLimits()
-      throws Exception {
-    // Run the test only if memory management is enabled
-    if (!isProcfsBasedTreeAvailable()) {
-      return;
-    }
-
-    // Fairly large value for sleepJob to succeed
-    long ttLimit = 4 * 1024 * 1024 * 1024L;
-    // Start cluster with proper configuration.
-    JobConf fConf = new JobConf();
-
-    fConf.setClass(
-        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        ttLimit);
-    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, ttLimit);
-    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, ttLimit);
-    fConf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
-    startCluster(fConf);
-    JobConf conf = new JobConf();
+    startCluster(new JobConf());
+    long PER_TASK_LIMIT = 1L; // Doesn't matter how low.
+    JobConf conf = miniMRCluster.createJobConf();
+    conf.setMemoryForMapTask(PER_TASK_LIMIT);
+    conf.setMemoryForReduceTask(PER_TASK_LIMIT);
     runAndCheckSuccessfulJob(conf);
   }
 
@@ -202,33 +159,25 @@
     }
 
     // Large so that sleepjob goes through and fits total TT usage
-    long PER_TASK_LIMIT = 2 * 1024 * 1024 * 1024L;
-    long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L;
+    long PER_TASK_LIMIT = 2 * 1024L;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-
-    fConf.setClass(
-        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024L);
     fConf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
-    startCluster(fConf);
-    JobConf conf = new JobConf();
-    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024L);
+    startCluster(new JobConf());
+
+    JobConf conf = new JobConf(miniMRCluster.createJobConf());
+    conf.setMemoryForMapTask(PER_TASK_LIMIT);
+    conf.setMemoryForReduceTask(PER_TASK_LIMIT);
     runAndCheckSuccessfulJob(conf);
-
   }
 
   /**
-   * Test for verifying that tasks that go beyond limits, though the cumulative
-   * usage is under TT's limits, get killed.
+   * Test for verifying that tasks that go beyond limits get killed.
    * 
    * @throws Exception
    */
@@ -240,43 +189,32 @@
       return;
     }
 
-    long PER_TASK_LIMIT = 444; // Low enough to kill off sleepJob tasks.
-    long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L; // Large so as to fit
-    // total usage
+    long PER_TASK_LIMIT = 1L; // Low enough to kill off sleepJob tasks.
+
     Pattern taskOverLimitPattern =
         Pattern.compile(String.format(taskOverLimitPatternString, String
-            .valueOf(PER_TASK_LIMIT)));
+            .valueOf(PER_TASK_LIMIT*1024*1024L)));
     Matcher mat = null;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-    fConf.setClass(
-        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
 
     // very small value, so that no task escapes to successful completion.
     fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
         String.valueOf(300));
+    fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024);
+    fConf.setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024);
     startCluster(fConf);
 
     // Set up job.
-    JobConf conf = new JobConf();
-    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
-    JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
-    conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
-        + jt.getTrackerPort());
-    NameNode nn = miniDFSCluster.getNameNode();
-    conf.set("fs.default.name", "hdfs://"
-        + nn.getNameNodeAddress().getHostName() + ":"
-        + nn.getNameNodeAddress().getPort());
+    JobConf conf = new JobConf(miniMRCluster.createJobConf());
+    conf.setMemoryForMapTask(PER_TASK_LIMIT);
+    conf.setMemoryForReduceTask(PER_TASK_LIMIT);
+    conf.setMaxMapAttempts(1);
+    conf.setMaxReduceAttempts(1);
 
     // Start the job.
     int ret = 0;
@@ -334,48 +272,39 @@
     }
 
     // Large enough for SleepJob Tasks.
-    long PER_TASK_LIMIT = 100000000000L;
-    // Very Limited TT. All tasks will be killed.
-    long TASK_TRACKER_LIMIT = 100L;
-    Pattern taskOverLimitPattern =
-        Pattern.compile(String.format(taskOverLimitPatternString, String
-            .valueOf(PER_TASK_LIMIT)));
-    Pattern trackerOverLimitPattern =
-        Pattern
-            .compile("Killing one of the least progress tasks - .*, as "
-                + "the cumulative memory usage of all the tasks on the TaskTracker"
-                + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
-    Matcher mat = null;
+    long PER_TASK_LIMIT = 100 * 1024L;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-    fConf.setClass(
-        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+        1L);
     fConf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1L);
+
+    // Because of the above, the total tt limit is 2mb
+    long TASK_TRACKER_LIMIT = 2 * 1024 * 1024L;
+
     // very small value, so that no task escapes to successful completion.
     fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
         String.valueOf(300));
 
     startCluster(fConf);
 
+    Pattern taskOverLimitPattern =
+      Pattern.compile(String.format(taskOverLimitPatternString, String
+          .valueOf(PER_TASK_LIMIT)));
+
+    Pattern trackerOverLimitPattern =
+      Pattern
+          .compile("Killing one of the least progress tasks - .*, as "
+              + "the cumulative memory usage of all the tasks on the TaskTracker"
+              + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
+    Matcher mat = null;
+
     // Set up job.
-    JobConf conf = new JobConf();
-    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
-    JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
-    conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
-        + jt.getTrackerPort());
-    NameNode nn = miniDFSCluster.getNameNode();
-    conf.set("fs.default.name", "hdfs://"
-        + nn.getNameNodeAddress().getHostName() + ":"
-        + nn.getNameNodeAddress().getPort());
+    JobConf conf = new JobConf(miniMRCluster.createJobConf());
+    conf.setMemoryForMapTask(PER_TASK_LIMIT);
+    conf.setMemoryForReduceTask(PER_TASK_LIMIT);
 
     JobClient jClient = new JobClient(conf);
     SleepJob sleepJob = new SleepJob();
@@ -385,10 +314,12 @@
     job.submit();
     boolean TTOverFlowMsgPresent = false;
     while (true) {
-      // Set-up tasks are the first to be launched.
-      TaskReport[] setUpReports = jClient.getSetupTaskReports(
-                                    (org.apache.hadoop.mapred.JobID)job.getID());
-      for (TaskReport tr : setUpReports) {
+      List<TaskReport> allTaskReports = new ArrayList<TaskReport>();
+      allTaskReports.addAll(Arrays.asList(jClient
+          .getSetupTaskReports((org.apache.hadoop.mapred.JobID) job.getID())));
+      allTaskReports.addAll(Arrays.asList(jClient
+          .getMapTaskReports((org.apache.hadoop.mapred.JobID) job.getID())));
+      for (TaskReport tr : allTaskReports) {
         String[] diag = tr.getDiagnostics();
         for (String str : diag) {
           mat = taskOverLimitPattern.matcher(str);