You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2008/11/19 07:21:34 UTC
svn commit: r718863 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
src/mapred/org/apache/hadoop/mapred/TaskTracker.java
src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
Author: yhemanth
Date: Tue Nov 18 22:21:33 2008
New Revision: 718863
URL: http://svn.apache.org/viewvc?rev=718863&view=rev
Log:
HADOOP-4523. Prevent too many tasks scheduled on a node from bringing it down by monitoring for cumulative memory usage across tasks. Contributed by Vinod Kumar Vavilapalli
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=718863&r1=718862&r2=718863&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Nov 18 22:21:33 2008
@@ -114,6 +114,10 @@
HADOOP-4185. Adds setVerifyChecksum() method to FileSystem.
(Sharad Agarwal via ddas)
+ HADOOP-4523. Prevent too many tasks scheduled on a node from bringing
+ it down by monitoring for cumulative memory usage across tasks.
+ (Vinod Kumar Vavilapalli via yhemanth)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=718863&r1=718862&r2=718863&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Tue Nov 18 22:21:33 2008
@@ -31,6 +31,7 @@
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
/**
@@ -45,6 +46,8 @@
private long monitoringInterval;
private long sleepTimeBeforeSigKill;
+ private long maxMemoryAllowedForAllTasks;
+
private Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
private List<TaskAttemptID> tasksToBeRemoved;
@@ -57,6 +60,8 @@
tasksToBeAdded = new HashMap<TaskAttemptID, ProcessTreeInfo>();
tasksToBeRemoved = new ArrayList<TaskAttemptID>();
+ maxMemoryAllowedForAllTasks = taskTracker.getMaxVirtualMemoryForTasks();
+
monitoringInterval = taskTracker.getJobConf().getLong(
"mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
sleepTimeBeforeSigKill = taskTracker.getJobConf().getLong(
@@ -155,6 +160,7 @@
tasksToBeRemoved.clear();
}
+ long memoryStillInUsage = 0;
// Now, check memory usage and kill any overflowing tasks
for (Iterator<Map.Entry<TaskAttemptID, ProcessTreeInfo>> it = processTreeInfoMap
.entrySet().iterator(); it.hasNext();) {
@@ -207,15 +213,28 @@
+ "] is running beyond memory-limits. Current usage : "
+ currentMemUsage + "kB. Limit : " + limit + "kB. Killing task.";
LOG.warn(msg);
- taskTracker.cleanUpOverMemoryTask(tid, msg);
+ taskTracker.cleanUpOverMemoryTask(tid, true, msg);
// Now destroy the ProcessTree, remove it from monitoring map.
pTree.destroy();
it.remove();
LOG.info("Removed ProcessTree with root " + pId);
+ } else {
+ // Accounting the total memory in usage for all tasks that are still
+ // alive and within limits.
+ memoryStillInUsage += currentMemUsage;
}
}
+ LOG.debug("Memory still in usage across all tasks : " + memoryStillInUsage
+ + "kB. Total limit : " + maxMemoryAllowedForAllTasks);
+
+ if (memoryStillInUsage > maxMemoryAllowedForAllTasks) {
+ LOG.warn("The total memory usage is still overflowing TTs limits."
+ + " Trying to kill a few tasks with the least progress.");
+ killTasksWithLeastProgress(memoryStillInUsage);
+ }
+
// Sleep for some time before beginning next cycle
try {
LOG.debug(this.getClass() + " : Sleeping for " + monitoringInterval
@@ -229,6 +248,55 @@
}
}
+ private void killTasksWithLeastProgress(long memoryStillInUsage) {
+
+ List<TaskAttemptID> tasksToKill = new ArrayList<TaskAttemptID>();
+ List<TaskAttemptID> tasksToExclude = new ArrayList<TaskAttemptID>();
+ // Find tasks to kill so as to get memory usage under limits.
+ while (memoryStillInUsage > maxMemoryAllowedForAllTasks) {
+ // Exclude tasks that are already marked for
+ // killing.
+ TaskInProgress task = taskTracker.findTaskToKill(tasksToExclude);
+ if (task == null) {
+ break; // couldn't find any more tasks to kill.
+ }
+
+ TaskAttemptID tid = task.getTask().getTaskID();
+ if (processTreeInfoMap.containsKey(tid)) {
+ ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
+ ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
+ memoryStillInUsage -= pTree.getCumulativeVmem();
+ tasksToKill.add(tid);
+ }
+ // Exclude this task from next search because it is already
+ // considered.
+ tasksToExclude.add(tid);
+ }
+
+ // Now kill the tasks.
+ if (!tasksToKill.isEmpty()) {
+ for (TaskAttemptID tid : tasksToKill) {
+ String msg =
+ "Killing one of the least progress tasks - " + tid
+ + ", as the cumulative memory usage of all the tasks on "
+ + "the TaskTracker exceeds virtual memory limit "
+ + maxMemoryAllowedForAllTasks + ".";
+ LOG.warn(msg);
+ // Kill the task and mark it as killed.
+ taskTracker.cleanUpOverMemoryTask(tid, false, msg);
+ // Now destroy the ProcessTree, remove it from monitoring map.
+ ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
+ ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
+ pTree.destroy();
+ processTreeInfoMap.remove(tid);
+ LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
+ }
+ } else {
+ LOG.info("The total memory usage is overflowing TTs limits. "
+ + "But found no alive task to kill for freeing memory.");
+ }
+ }
+
/**
* Load pid of the task from the pidFile.
*
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=718863&r1=718862&r2=718863&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Nov 18 22:21:33 2008
@@ -1386,7 +1386,7 @@
//we give up! do not accept new tasks until
//all the ones running have finished and they're all cleared up
synchronized (this) {
- TaskInProgress killMe = findTaskToKill();
+ TaskInProgress killMe = findTaskToKill(null);
if (killMe!=null) {
String msg = "Tasktracker running out of space." +
@@ -1398,15 +1398,24 @@
}
}
}
-
+
/**
- * Pick a task to kill to free up space
+ * Pick a task to kill to free up memory/disk-space
+ * @param tasksToExclude tasks that are to be excluded while trying to find a
+ * task to kill. If null, all runningTasks will be searched.
* @return the task to kill or null, if one wasn't found
*/
- private TaskInProgress findTaskToKill() {
+ synchronized TaskInProgress findTaskToKill(List<TaskAttemptID> tasksToExclude) {
TaskInProgress killMe = null;
for (Iterator it = runningTasks.values().iterator(); it.hasNext();) {
TaskInProgress tip = (TaskInProgress) it.next();
+
+ if (tasksToExclude != null
+ && tasksToExclude.contains(tip.getTask().getTaskID())) {
+ // exclude this task
+ continue;
+ }
+
if ((tip.getRunState() == TaskStatus.State.RUNNING ||
tip.getRunState() == TaskStatus.State.COMMIT_PENDING) &&
!tip.wasKilled) {
@@ -1434,7 +1443,7 @@
}
return killMe;
}
-
+
/**
* Check if any of the local directories has enough
* free space (more than minSpace)
@@ -2944,15 +2953,17 @@
/**
* Clean-up the task that TaskMemoryMangerThread requests to do so.
* @param tid
+ * @param wasFailure mark the task as failed or killed. 'failed' if true,
+ * 'killed' otherwise
* @param diagnosticMsg
*/
- synchronized void cleanUpOverMemoryTask(TaskAttemptID tid,
+ synchronized void cleanUpOverMemoryTask(TaskAttemptID tid, boolean wasFailure,
String diagnosticMsg) {
TaskInProgress tip = runningTasks.get(tid);
if (tip != null) {
tip.reportDiagnosticInfo(diagnosticMsg);
try {
- purgeTask(tip, true); // Marking it as failure.
+ purgeTask(tip, wasFailure); // Marking it as failed/killed.
} catch (IOException ioe) {
LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=718863&r1=718862&r2=718863&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Tue Nov 18 22:21:33 2008
@@ -18,9 +18,7 @@
package org.apache.hadoop.mapred;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
+import java.io.IOException;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
@@ -28,22 +26,28 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.examples.WordCount;
+import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.util.Tool;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import junit.framework.TestCase;
+/**
+ * Test class to verify memory management of tasks.
+ */
public class TestTaskTrackerMemoryManager extends TestCase {
- private static final Log LOG = LogFactory.getLog(TestTaskTrackerMemoryManager.class);
+ private static final Log LOG =
+ LogFactory.getLog(TestTaskTrackerMemoryManager.class);
private MiniDFSCluster miniDFSCluster;
private MiniMRCluster miniMRCluster;
+ private String taskOverLimitPatternString =
+ "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. "
+ + "Current usage : [0-9]*kB. Limit : %skB. Killing task.";
+
private void startCluster(JobConf conf) throws Exception {
miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
FileSystem fileSys = miniDFSCluster.getFileSystem();
@@ -61,55 +65,14 @@
}
}
- private void runWordCount(JobConf conf) throws Exception {
- Path input = new Path("input.txt");
- Path output = new Path("output");
-
- OutputStream os = miniDFSCluster.getFileSystem().create(input);
- Writer wr = new OutputStreamWriter(os);
- wr.write("hello1\n");
- wr.write("hello2\n");
- wr.write("hello3\n");
- wr.write("hello4\n");
- wr.close();
-
- Tool WordCount = new WordCount();
- if (conf != null) {
- WordCount.setConf(conf);
- }
- ToolRunner.run(WordCount, new String[] { input.toString(),
- output.toString() });
+ private void runSleepJob(JobConf conf) throws Exception {
+ String[] args = { "-m", "3", "-r", "1", "-mt", "3000", "-rt", "1000" };
+ ToolRunner.run(conf, new SleepJob(), args);
}
- public void testNormalTaskAndLimitedTT() throws Exception {
- // Run the test only if memory management is enabled
-
- try {
- if (!ProcfsBasedProcessTree.isAvailable()) {
- LOG.info("Currently ProcessTree has only one implementation "
- + "ProcfsBasedProcessTree, which is not available on this "
- + "system. Not testing");
- return;
- }
- } catch (Exception e) {
- LOG.info(StringUtils.stringifyException(e));
- return;
- }
-
- Pattern diagMsgPattern = Pattern
- .compile("TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond "
- + "memory-limits. Current usage : [0-9]*kB. Limit : [0-9]*kB. Killing task.");
- Matcher mat = null;
-
- // Start cluster with proper configuration.
- JobConf fConf = new JobConf();
-
- fConf.setLong("mapred.tasktracker.tasks.maxmemory",
- Long.valueOf(10000000000L)); // Fairly large value for WordCount to succeed
- startCluster(fConf);
-
+ private void runAndCheckSuccessfulJob(JobConf conf)
+ throws IOException {
// Set up job.
- JobConf conf = new JobConf();
JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
+ jt.getTrackerPort());
@@ -118,10 +81,14 @@
+ nn.getNameNodeAddress().getHostName() + ":"
+ nn.getNameNodeAddress().getPort());
+ Pattern taskOverLimitPattern =
+ Pattern.compile(String.format(taskOverLimitPatternString, "[0-9]*"));
+ Matcher mat = null;
+
// Start the job.
boolean success = true;
try {
- runWordCount(conf);
+ runSleepJob(conf);
success = true;
} catch (Exception e) {
success = false;
@@ -130,8 +97,6 @@
// Job has to succeed
assertTrue(success);
- // Alas, we don't have a way to get job id/Task completion events from
- // WordCount
JobClient jClient = new JobClient(conf);
JobStatus[] jStatus = jClient.getAllJobs();
JobStatus js = jStatus[0]; // Our only job
@@ -141,12 +106,12 @@
TaskCompletionEvent[] taskComplEvents = rj.getTaskCompletionEvents(0);
for (TaskCompletionEvent tce : taskComplEvents) {
- String[] diagnostics = jClient.jobSubmitClient.getTaskDiagnostics(tce
- .getTaskAttemptId());
+ String[] diagnostics =
+ jClient.jobSubmitClient.getTaskDiagnostics(tce.getTaskAttemptId());
if (diagnostics != null) {
for (String str : diagnostics) {
- mat = diagMsgPattern.matcher(str);
+ mat = taskOverLimitPattern.matcher(str);
// The error pattern shouldn't be there in any TIP's diagnostics
assertFalse(mat.find());
}
@@ -154,34 +119,123 @@
}
}
- public void testOOMTaskAndLimitedTT() throws Exception {
-
- // Run the test only if memory management is enabled
-
+ private boolean isProcfsBasedTreeAvailable() {
try {
if (!ProcfsBasedProcessTree.isAvailable()) {
LOG.info("Currently ProcessTree has only one implementation "
+ "ProcfsBasedProcessTree, which is not available on this "
+ "system. Not testing");
- return;
+ return false;
}
} catch (Exception e) {
LOG.info(StringUtils.stringifyException(e));
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Test for verifying that nothing is killed when memory management is
+ * disabled on the TT, even when the tasks run over their limits.
+ *
+ * @throws Exception
+ */
+ public void testTTLimitsDisabled()
+ throws Exception {
+ // Run the test only if memory management is enabled
+ if (!isProcfsBasedTreeAvailable()) {
+ return;
+ }
+
+ JobConf conf = new JobConf();
+ // Task-memory management disabled by default.
+ startCluster(conf);
+ long PER_TASK_LIMIT = 100L; // Doesn't matter how low.
+ conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
+ runAndCheckSuccessfulJob(conf);
+ }
+
+ /**
+ * Test for verifying that tasks with no limits, with the cumulative usage
+ * still under TT's limits, succeed.
+ *
+ * @throws Exception
+ */
+ public void testTasksWithNoLimits()
+ throws Exception {
+ // Run the test only if memory management is enabled
+ if (!isProcfsBasedTreeAvailable()) {
+ return;
+ }
+
+ // Start cluster with proper configuration.
+ JobConf fConf = new JobConf();
+
+ // Fairly large value for sleepJob to succeed
+ fConf.setLong("mapred.tasktracker.tasks.maxmemory", 10000000000L);
+ startCluster(fConf);
+
+ // Set up job.
+ JobConf conf = new JobConf();
+ runAndCheckSuccessfulJob(conf);
+ }
+
+ /**
+ * Test for verifying that tasks within limits, with the cumulative usage also
+ * under TT's limits succeed.
+ *
+ * @throws Exception
+ */
+ public void testTasksWithinLimits()
+ throws Exception {
+ // Run the test only if memory management is enabled
+ if (!isProcfsBasedTreeAvailable()) {
+ return;
+ }
+
+ long PER_TASK_LIMIT = 10000000000L; // Large so sleepjob goes through.
+ long TASK_TRACKER_LIMIT = 10000000000L; // Large so as to fit total usage
+
+ // Start cluster with proper configuration.
+ JobConf fConf = new JobConf();
+
+ // Fairly large value for sleepjob to succeed
+ fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
+ startCluster(fConf);
+
+ JobConf conf = new JobConf();
+ conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
+ runAndCheckSuccessfulJob(conf);
+ }
+
+ /**
+ * Test for verifying that tasks that go beyond limits, though the cumulative
+ * usage is under TT's limits, get killed.
+ *
+ * @throws Exception
+ */
+ public void testTasksBeyondLimits()
+ throws Exception {
+
+ // Run the test only if memory management is enabled
+ if (!isProcfsBasedTreeAvailable()) {
return;
}
- long PER_TASK_LIMIT = 444; // Enough to kill off WordCount.
- Pattern diagMsgPattern = Pattern
- .compile("TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond "
- + "memory-limits. Current usage : [0-9]*kB. Limit : "
- + PER_TASK_LIMIT + "kB. Killing task.");
+ long PER_TASK_LIMIT = 444; // Low enough to kill off sleepJob tasks.
+ long TASK_TRACKER_LIMIT = 10000000000L; // Large so as to fit total usage
+ Pattern taskOverLimitPattern =
+ Pattern.compile(String.format(taskOverLimitPatternString, String
+ .valueOf(PER_TASK_LIMIT)));
Matcher mat = null;
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
- fConf.setLong("mapred.tasktracker.tasks.maxmemory", Long.valueOf(100000));
- fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300));
- //very small value, so that no task escapes to successful completion.
+ fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
+
+ // very small value, so that no task escapes to successful completion.
+ fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
+ String.valueOf(300));
startCluster(fConf);
// Set up job.
@@ -198,7 +252,7 @@
// Start the job.
boolean success = true;
try {
- runWordCount(conf);
+ runSleepJob(conf);
success = true;
} catch (Exception e) {
success = false;
@@ -207,8 +261,6 @@
// Job has to fail
assertFalse(success);
- // Alas, we don't have a way to get job id/Task completion events from
- // WordCount
JobClient jClient = new JobClient(conf);
JobStatus[] jStatus = jClient.getAllJobs();
JobStatus js = jStatus[0]; // Our only job
@@ -222,18 +274,104 @@
assert (tce.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED || tce
.getTaskStatus() == TaskCompletionEvent.Status.FAILED);
- String[] diagnostics = jClient.jobSubmitClient.getTaskDiagnostics(tce
- .getTaskAttemptId());
+ String[] diagnostics =
+ jClient.jobSubmitClient.getTaskDiagnostics(tce.getTaskAttemptId());
// Every task HAS to spit out the out-of-memory errors
assert (diagnostics != null);
for (String str : diagnostics) {
- mat = diagMsgPattern.matcher(str);
+ mat = taskOverLimitPattern.matcher(str);
// Every task HAS to spit out the out-of-memory errors in the same
// format. And these are the only diagnostic messages.
assertTrue(mat.find());
}
}
}
-}
+
+ /**
+ * Test for verifying that tasks causing cumulative usage to go beyond TT's
+ * limit get killed even though they all are under individual limits. Memory
+ * management for tasks with disabled task-limits also traverses the same
+ * code-path, so we don't need a separate testTaskLimitsDisabled.
+ *
+ * @throws Exception
+ */
+ public void testTasksCumulativelyExceedingTTLimits()
+ throws Exception {
+
+ // Run the test only if memory management is enabled
+ if (!isProcfsBasedTreeAvailable()) {
+ return;
+ }
+
+ // Large enough for SleepJob Tasks.
+ long PER_TASK_LIMIT = 100000000000L;
+ // Very Limited TT. All tasks will be killed.
+ long TASK_TRACKER_LIMIT = 100L;
+ Pattern taskOverLimitPattern =
+ Pattern.compile(String.format(taskOverLimitPatternString, String
+ .valueOf(PER_TASK_LIMIT)));
+ Pattern trackerOverLimitPattern =
+ Pattern.compile("Killing one of the least progress tasks - .*, as "
+ + "the cumulative memory usage of all the tasks on the TaskTracker"
+ + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
+ Matcher mat = null;
+
+ // Start cluster with proper configuration.
+ JobConf fConf = new JobConf();
+ fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
+ // very small value, so that no task escapes to successful completion.
+ fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
+ String.valueOf(300));
+
+ startCluster(fConf);
+
+ // Set up job.
+ JobConf conf = new JobConf();
+ conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
+ JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
+ conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
+ + jt.getTrackerPort());
+ NameNode nn = miniDFSCluster.getNameNode();
+ conf.set("fs.default.name", "hdfs://"
+ + nn.getNameNodeAddress().getHostName() + ":"
+ + nn.getNameNodeAddress().getPort());
+
+ JobClient jClient = new JobClient(conf);
+ SleepJob sleepJob = new SleepJob();
+ sleepJob.setConf(conf);
+ // Start the job
+ RunningJob job =
+ jClient.submitJob(sleepJob.setupJobConf(1, 1, 5000, 1, 1000, 1));
+ boolean TTOverFlowMsgPresent = false;
+ while (true) {
+ // Set-up tasks are the first to be launched.
+ TaskReport[] setUpReports = jt.getSetupTaskReports(job.getID());
+ for (TaskReport tr : setUpReports) {
+ String[] diag = tr.getDiagnostics();
+ for (String str : diag) {
+ mat = taskOverLimitPattern.matcher(str);
+ assertFalse(mat.find());
+ mat = trackerOverLimitPattern.matcher(str);
+ if (mat.find()) {
+ TTOverFlowMsgPresent = true;
+ }
+ }
+ }
+ if (TTOverFlowMsgPresent) {
+ break;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // nothing
+ }
+ }
+ // If it comes here without a test-timeout, it means there was a task that
+ // was killed because of crossing cumulative TT limit.
+
+ // Test succeeded, kill the job.
+ job.killJob();
+ }
+}
\ No newline at end of file