You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2008/11/19 07:21:34 UTC

svn commit: r718863 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java src/mapred/org/apache/hadoop/mapred/TaskTracker.java src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java

Author: yhemanth
Date: Tue Nov 18 22:21:33 2008
New Revision: 718863

URL: http://svn.apache.org/viewvc?rev=718863&view=rev
Log:
HADOOP-4523. Prevent too many tasks scheduled on a node from bringing it down by monitoring for cumulative memory usage across tasks. Contributed by Vinod Kumar Vavilapalli

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=718863&r1=718862&r2=718863&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Nov 18 22:21:33 2008
@@ -114,6 +114,10 @@
     HADOOP-4185. Adds setVerifyChecksum() method to FileSystem.
     (Sharad Agarwal via ddas)
 
+    HADOOP-4523. Prevent too many tasks scheduled on a node from bringing
+    it down by monitoring for cumulative memory usage across tasks.
+    (Vinod Kumar Vavilapalli via yhemanth)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=718863&r1=718862&r2=718863&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Tue Nov 18 22:21:33 2008
@@ -31,6 +31,7 @@
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 
 /**
@@ -45,6 +46,8 @@
   private long monitoringInterval;
   private long sleepTimeBeforeSigKill;
 
+  private long maxMemoryAllowedForAllTasks;
+
   private Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
   private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
   private List<TaskAttemptID> tasksToBeRemoved;
@@ -57,6 +60,8 @@
     tasksToBeAdded = new HashMap<TaskAttemptID, ProcessTreeInfo>();
     tasksToBeRemoved = new ArrayList<TaskAttemptID>();
 
+    maxMemoryAllowedForAllTasks = taskTracker.getMaxVirtualMemoryForTasks();
+
     monitoringInterval = taskTracker.getJobConf().getLong(
         "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
     sleepTimeBeforeSigKill = taskTracker.getJobConf().getLong(
@@ -155,6 +160,7 @@
         tasksToBeRemoved.clear();
       }
 
+      long memoryStillInUsage = 0;
       // Now, check memory usage and kill any overflowing tasks
       for (Iterator<Map.Entry<TaskAttemptID, ProcessTreeInfo>> it = processTreeInfoMap
           .entrySet().iterator(); it.hasNext();) {
@@ -207,15 +213,28 @@
               + "] is running beyond memory-limits. Current usage : "
               + currentMemUsage + "kB. Limit : " + limit + "kB. Killing task.";
           LOG.warn(msg);
-          taskTracker.cleanUpOverMemoryTask(tid, msg);
+          taskTracker.cleanUpOverMemoryTask(tid, true, msg);
 
           // Now destroy the ProcessTree, remove it from monitoring map.
           pTree.destroy();
           it.remove();
           LOG.info("Removed ProcessTree with root " + pId);
+        } else {
+          // Accounting the total memory in usage for all tasks that are still
+          // alive and within limits.
+          memoryStillInUsage += currentMemUsage;
         }
       }
 
+      LOG.debug("Memory still in usage across all tasks : " + memoryStillInUsage
+          + "kB. Total limit : " + maxMemoryAllowedForAllTasks);
+
+      if (memoryStillInUsage > maxMemoryAllowedForAllTasks) {
+        LOG.warn("The total memory usage is still overflowing TTs limits."
+            + " Trying to kill a few tasks with the least progress.");
+        killTasksWithLeastProgress(memoryStillInUsage);
+      }
+    
       // Sleep for some time before beginning next cycle
       try {
         LOG.debug(this.getClass() + " : Sleeping for " + monitoringInterval
@@ -229,6 +248,55 @@
     }
   }
 
+  private void killTasksWithLeastProgress(long memoryStillInUsage) {
+
+    List<TaskAttemptID> tasksToKill = new ArrayList<TaskAttemptID>();
+    List<TaskAttemptID> tasksToExclude = new ArrayList<TaskAttemptID>();
+    // Find tasks to kill so as to get memory usage under limits.
+    while (memoryStillInUsage > maxMemoryAllowedForAllTasks) {
+      // Exclude tasks that are already marked for
+      // killing.
+      TaskInProgress task = taskTracker.findTaskToKill(tasksToExclude);
+      if (task == null) {
+        break; // couldn't find any more tasks to kill.
+      }
+
+      TaskAttemptID tid = task.getTask().getTaskID();
+      if (processTreeInfoMap.containsKey(tid)) {
+        ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
+        ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
+        memoryStillInUsage -= pTree.getCumulativeVmem();
+        tasksToKill.add(tid);
+      }
+      // Exclude this task from next search because it is already
+      // considered.
+      tasksToExclude.add(tid);
+    }
+
+    // Now kill the tasks.
+    if (!tasksToKill.isEmpty()) {
+      for (TaskAttemptID tid : tasksToKill) {
+        String msg =
+            "Killing one of the least progress tasks - " + tid
+                + ", as the cumulative memory usage of all the tasks on "
+                + "the TaskTracker exceeds virtual memory limit "
+                + maxMemoryAllowedForAllTasks + ".";
+        LOG.warn(msg);
+        // Kill the task and mark it as killed.
+        taskTracker.cleanUpOverMemoryTask(tid, false, msg);
+        // Now destroy the ProcessTree, remove it from monitoring map.
+        ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
+        ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
+        pTree.destroy();
+        processTreeInfoMap.remove(tid);
+        LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
+      }
+    } else {
+      LOG.info("The total memory usage is overflowing TTs limits. "
+          + "But found no alive task to kill for freeing memory.");
+    }
+  }
+
   /**
    * Load pid of the task from the pidFile.
    * 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=718863&r1=718862&r2=718863&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Nov 18 22:21:33 2008
@@ -1386,7 +1386,7 @@
       //we give up! do not accept new tasks until
       //all the ones running have finished and they're all cleared up
       synchronized (this) {
-        TaskInProgress killMe = findTaskToKill();
+        TaskInProgress killMe = findTaskToKill(null);
 
         if (killMe!=null) {
           String msg = "Tasktracker running out of space." +
@@ -1398,15 +1398,24 @@
       }
     }
   }
-    
+
   /**
-   * Pick a task to kill to free up space
+   * Pick a task to kill to free up memory/disk-space 
+   * @param tasksToExclude tasks that are to be excluded while trying to find a
+   *          task to kill. If null, all runningTasks will be searched.
    * @return the task to kill or null, if one wasn't found
    */
-  private TaskInProgress findTaskToKill() {
+  synchronized TaskInProgress findTaskToKill(List<TaskAttemptID> tasksToExclude) {
     TaskInProgress killMe = null;
     for (Iterator it = runningTasks.values().iterator(); it.hasNext();) {
       TaskInProgress tip = (TaskInProgress) it.next();
+
+      if (tasksToExclude != null
+          && tasksToExclude.contains(tip.getTask().getTaskID())) {
+        // exclude this task
+        continue;
+      }
+
       if ((tip.getRunState() == TaskStatus.State.RUNNING ||
            tip.getRunState() == TaskStatus.State.COMMIT_PENDING) &&
           !tip.wasKilled) {
@@ -1434,7 +1443,7 @@
     }
     return killMe;
   }
-    
+
   /**
    * Check if any of the local directories has enough
    * free space  (more than minSpace)
@@ -2944,15 +2953,17 @@
   /**
    * Clean-up the task that TaskMemoryMangerThread requests to do so.
    * @param tid
+   * @param wasFailure mark the task as failed or killed. 'failed' if true,
+   *          'killed' otherwise
    * @param diagnosticMsg
    */
-  synchronized void cleanUpOverMemoryTask(TaskAttemptID tid,
+  synchronized void cleanUpOverMemoryTask(TaskAttemptID tid, boolean wasFailure,
       String diagnosticMsg) {
     TaskInProgress tip = runningTasks.get(tid);
     if (tip != null) {
       tip.reportDiagnosticInfo(diagnosticMsg);
       try {
-        purgeTask(tip, true); // Marking it as failure.
+        purgeTask(tip, wasFailure); // Marking it as failed/killed.
       } catch (IOException ioe) {
         LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
       }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=718863&r1=718862&r2=718863&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Tue Nov 18 22:21:33 2008
@@ -18,9 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
+import java.io.IOException;
 import java.util.regex.Pattern;
 import java.util.regex.Matcher;
 
@@ -28,22 +26,28 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.examples.WordCount;
+import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import junit.framework.TestCase;
 
+/**
+ * Test class to verify memory management of tasks.
+ */
 public class TestTaskTrackerMemoryManager extends TestCase {
 
-  private static final Log LOG = LogFactory.getLog(TestTaskTrackerMemoryManager.class);
+  private static final Log LOG =
+      LogFactory.getLog(TestTaskTrackerMemoryManager.class);
   private MiniDFSCluster miniDFSCluster;
   private MiniMRCluster miniMRCluster;
 
+  private String taskOverLimitPatternString =
+      "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. "
+          + "Current usage : [0-9]*kB. Limit : %skB. Killing task.";
+
   private void startCluster(JobConf conf) throws Exception {
     miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fileSys = miniDFSCluster.getFileSystem();
@@ -61,55 +65,14 @@
     }
   }
 
-  private void runWordCount(JobConf conf) throws Exception {
-    Path input = new Path("input.txt");
-    Path output = new Path("output");
-
-    OutputStream os = miniDFSCluster.getFileSystem().create(input);
-    Writer wr = new OutputStreamWriter(os);
-    wr.write("hello1\n");
-    wr.write("hello2\n");
-    wr.write("hello3\n");
-    wr.write("hello4\n");
-    wr.close();
-
-    Tool WordCount = new WordCount();
-    if (conf != null) {
-      WordCount.setConf(conf);
-    }
-    ToolRunner.run(WordCount, new String[] { input.toString(),
-        output.toString() });
+  private void runSleepJob(JobConf conf) throws Exception {
+    String[] args = { "-m", "3", "-r", "1", "-mt", "3000", "-rt", "1000" };
+    ToolRunner.run(conf, new SleepJob(), args);
   }
 
-  public void testNormalTaskAndLimitedTT() throws Exception {
-    // Run the test only if memory management is enabled
-
-    try {
-      if (!ProcfsBasedProcessTree.isAvailable()) {
-        LOG.info("Currently ProcessTree has only one implementation "
-            + "ProcfsBasedProcessTree, which is not available on this "
-            + "system. Not testing");
-        return;
-      }
-    } catch (Exception e) {
-      LOG.info(StringUtils.stringifyException(e));
-      return;
-    }
-
-    Pattern diagMsgPattern = Pattern
-        .compile("TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond "
-            + "memory-limits. Current usage : [0-9]*kB. Limit : [0-9]*kB. Killing task.");
-    Matcher mat = null;
-
-    // Start cluster with proper configuration.
-    JobConf fConf = new JobConf();
-
-    fConf.setLong("mapred.tasktracker.tasks.maxmemory", 
-                      Long.valueOf(10000000000L)); // Fairly large value for WordCount to succeed
-    startCluster(fConf);
-
+  private void runAndCheckSuccessfulJob(JobConf conf)
+      throws IOException {
     // Set up job.
-    JobConf conf = new JobConf();
     JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
     conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
         + jt.getTrackerPort());
@@ -118,10 +81,14 @@
         + nn.getNameNodeAddress().getHostName() + ":"
         + nn.getNameNodeAddress().getPort());
 
+    Pattern taskOverLimitPattern =
+        Pattern.compile(String.format(taskOverLimitPatternString, "[0-9]*"));
+    Matcher mat = null;
+
     // Start the job.
     boolean success = true;
     try {
-      runWordCount(conf);
+      runSleepJob(conf);
       success = true;
     } catch (Exception e) {
       success = false;
@@ -130,8 +97,6 @@
     // Job has to succeed
     assertTrue(success);
 
-    // Alas, we don't have a way to get job id/Task completion events from
-    // WordCount
     JobClient jClient = new JobClient(conf);
     JobStatus[] jStatus = jClient.getAllJobs();
     JobStatus js = jStatus[0]; // Our only job
@@ -141,12 +106,12 @@
     TaskCompletionEvent[] taskComplEvents = rj.getTaskCompletionEvents(0);
 
     for (TaskCompletionEvent tce : taskComplEvents) {
-      String[] diagnostics = jClient.jobSubmitClient.getTaskDiagnostics(tce
-          .getTaskAttemptId());
+      String[] diagnostics =
+          jClient.jobSubmitClient.getTaskDiagnostics(tce.getTaskAttemptId());
 
       if (diagnostics != null) {
         for (String str : diagnostics) {
-          mat = diagMsgPattern.matcher(str);
+          mat = taskOverLimitPattern.matcher(str);
           // The error pattern shouldn't be there in any TIP's diagnostics
           assertFalse(mat.find());
         }
@@ -154,34 +119,123 @@
     }
   }
 
-  public void testOOMTaskAndLimitedTT() throws Exception {
-
-    // Run the test only if memory management is enabled
-
+  private boolean isProcfsBasedTreeAvailable() {
     try {
       if (!ProcfsBasedProcessTree.isAvailable()) {
         LOG.info("Currently ProcessTree has only one implementation "
             + "ProcfsBasedProcessTree, which is not available on this "
             + "system. Not testing");
-        return;
+        return false;
       }
     } catch (Exception e) {
       LOG.info(StringUtils.stringifyException(e));
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Test for verifying that nothing is killed when memory management is
+   * disabled on the TT, even when the tasks run over their limits.
+   * 
+   * @throws Exception
+   */
+  public void testTTLimitsDisabled()
+      throws Exception {
+    // Run the test only if memory management is enabled
+    if (!isProcfsBasedTreeAvailable()) {
+      return;
+    }
+
+    JobConf conf = new JobConf();
+    // Task-memory management disabled by default.
+    startCluster(conf);
+    long PER_TASK_LIMIT = 100L; // Doesn't matter how low.
+    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
+    runAndCheckSuccessfulJob(conf);
+  }
+
+  /**
+   * Test for verifying that tasks with no limits, with the cumulative usage
+   * still under TT's limits, succeed.
+   * 
+   * @throws Exception
+   */
+  public void testTasksWithNoLimits()
+      throws Exception {
+    // Run the test only if memory management is enabled
+    if (!isProcfsBasedTreeAvailable()) {
+      return;
+    }
+
+    // Start cluster with proper configuration.
+    JobConf fConf = new JobConf();
+
+    // Fairly large value for sleepJob to succeed
+    fConf.setLong("mapred.tasktracker.tasks.maxmemory", 10000000000L);
+    startCluster(fConf);
+
+    // Set up job.
+    JobConf conf = new JobConf();
+    runAndCheckSuccessfulJob(conf);
+  }
+
+  /**
+   * Test for verifying that tasks within limits, with the cumulative usage also
+   * under TT's limits succeed.
+   * 
+   * @throws Exception
+   */
+  public void testTasksWithinLimits()
+      throws Exception {
+    // Run the test only if memory management is enabled
+    if (!isProcfsBasedTreeAvailable()) {
+      return;
+    }
+
+    long PER_TASK_LIMIT = 10000000000L; // Large so sleepjob goes through.
+    long TASK_TRACKER_LIMIT = 10000000000L; // Large so as to fit total usage
+
+    // Start cluster with proper configuration.
+    JobConf fConf = new JobConf();
+
+    // Fairly large value for sleepjob to succeed
+    fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
+    startCluster(fConf);
+
+    JobConf conf = new JobConf();
+    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
+    runAndCheckSuccessfulJob(conf);
+  }
+
+  /**
+   * Test for verifying that tasks that go beyond limits, though the cumulative
+   * usage is under TT's limits, get killed.
+   * 
+   * @throws Exception
+   */
+  public void testTasksBeyondLimits()
+      throws Exception {
+
+    // Run the test only if memory management is enabled
+    if (!isProcfsBasedTreeAvailable()) {
       return;
     }
 
-    long PER_TASK_LIMIT = 444; // Enough to kill off WordCount.
-    Pattern diagMsgPattern = Pattern
-        .compile("TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond "
-            + "memory-limits. Current usage : [0-9]*kB. Limit : "
-            + PER_TASK_LIMIT + "kB. Killing task.");
+    long PER_TASK_LIMIT = 444; // Low enough to kill off sleepJob tasks.
+    long TASK_TRACKER_LIMIT = 10000000000L; // Large so as to fit total usage
+    Pattern taskOverLimitPattern =
+        Pattern.compile(String.format(taskOverLimitPatternString, String
+            .valueOf(PER_TASK_LIMIT)));
     Matcher mat = null;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-    fConf.setLong("mapred.tasktracker.tasks.maxmemory", Long.valueOf(100000));
-    fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300));
-            //very small value, so that no task escapes to successful completion.
+    fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
+
+    // very small value, so that no task escapes to successful completion.
+    fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
+        String.valueOf(300));
     startCluster(fConf);
 
     // Set up job.
@@ -198,7 +252,7 @@
     // Start the job.
     boolean success = true;
     try {
-      runWordCount(conf);
+      runSleepJob(conf);
       success = true;
     } catch (Exception e) {
       success = false;
@@ -207,8 +261,6 @@
     // Job has to fail
     assertFalse(success);
 
-    // Alas, we don't have a way to get job id/Task completion events from
-    // WordCount
     JobClient jClient = new JobClient(conf);
     JobStatus[] jStatus = jClient.getAllJobs();
     JobStatus js = jStatus[0]; // Our only job
@@ -222,18 +274,104 @@
       assert (tce.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED || tce
           .getTaskStatus() == TaskCompletionEvent.Status.FAILED);
 
-      String[] diagnostics = jClient.jobSubmitClient.getTaskDiagnostics(tce
-          .getTaskAttemptId());
+      String[] diagnostics =
+          jClient.jobSubmitClient.getTaskDiagnostics(tce.getTaskAttemptId());
 
       // Every task HAS to spit out the out-of-memory errors
       assert (diagnostics != null);
 
       for (String str : diagnostics) {
-        mat = diagMsgPattern.matcher(str);
+        mat = taskOverLimitPattern.matcher(str);
         // Every task HAS to spit out the out-of-memory errors in the same
         // format. And these are the only diagnostic messages.
         assertTrue(mat.find());
       }
     }
   }
-}
+
+  /**
+   * Test for verifying that tasks causing cumulative usage to go beyond TT's
+   * limit get killed even though they all are under individual limits. Memory
+   * management for tasks with disabled task-limits also traverses the same
+   * code-path, so we don't need a separate testTaskLimitsDisabled.
+   * 
+   * @throws Exception
+   */
+  public void testTasksCumulativelyExceedingTTLimits()
+      throws Exception {
+
+    // Run the test only if memory management is enabled
+    if (!isProcfsBasedTreeAvailable()) {
+      return;
+    }
+
+    // Large enough for SleepJob Tasks.
+    long PER_TASK_LIMIT = 100000000000L;
+    // Very Limited TT. All tasks will be killed.
+    long TASK_TRACKER_LIMIT = 100L;
+    Pattern taskOverLimitPattern =
+        Pattern.compile(String.format(taskOverLimitPatternString, String
+            .valueOf(PER_TASK_LIMIT)));
+    Pattern trackerOverLimitPattern =
+        Pattern.compile("Killing one of the least progress tasks - .*, as "
+            + "the cumulative memory usage of all the tasks on the TaskTracker"
+            + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
+    Matcher mat = null;
+
+    // Start cluster with proper configuration.
+    JobConf fConf = new JobConf();
+    fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
+    // very small value, so that no task escapes to successful completion.
+    fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
+        String.valueOf(300));
+
+    startCluster(fConf);
+
+    // Set up job.
+    JobConf conf = new JobConf();
+    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
+    JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
+    conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
+        + jt.getTrackerPort());
+    NameNode nn = miniDFSCluster.getNameNode();
+    conf.set("fs.default.name", "hdfs://"
+        + nn.getNameNodeAddress().getHostName() + ":"
+        + nn.getNameNodeAddress().getPort());
+
+    JobClient jClient = new JobClient(conf);
+    SleepJob sleepJob = new SleepJob();
+    sleepJob.setConf(conf);
+    // Start the job
+    RunningJob job =
+        jClient.submitJob(sleepJob.setupJobConf(1, 1, 5000, 1, 1000, 1));
+    boolean TTOverFlowMsgPresent = false;
+    while (true) {
+      // Set-up tasks are the first to be launched.
+      TaskReport[] setUpReports = jt.getSetupTaskReports(job.getID());
+      for (TaskReport tr : setUpReports) {
+        String[] diag = tr.getDiagnostics();
+        for (String str : diag) {
+          mat = taskOverLimitPattern.matcher(str);
+          assertFalse(mat.find());
+          mat = trackerOverLimitPattern.matcher(str);
+          if (mat.find()) {
+            TTOverFlowMsgPresent = true;
+          }
+        }
+      }
+      if (TTOverFlowMsgPresent) {
+        break;
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // nothing
+      }
+    }
+    // If it comes here without a test-timeout, it means there was a task that
+    // was killed because of crossing cumulative TT limit.
+
+    // Test succeeded, kill the job.
+    job.killJob();
+  }
+}
\ No newline at end of file