You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/28 17:28:27 UTC
svn commit: r417789 - in /lucene/hadoop/trunk: CHANGES.txt
conf/hadoop-default.xml src/java/org/apache/hadoop/mapred/TaskTracker.java
Author: cutting
Date: Wed Jun 28 08:28:27 2006
New Revision: 417789
URL: http://svn.apache.org/viewvc?rev=417789&view=rev
Log:
HADOOP-27. Don't allocate tasks to trackers whose local free space is too low.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=417789&r1=417788&r2=417789&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 28 08:28:27 2006
@@ -77,6 +77,9 @@
18. HADOOP-328. Add an option to the "distcp" command to ignore read
errors while copying. (omalley via cutting)
+19. HADOOP-27. Don't allocate tasks to trackers whose local free
+ space is too low. (Johan Oskarson via cutting)
+
Release 0.3.2 - 2006-06-09
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=417789&r1=417788&r2=417789&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Jun 28 08:28:27 2006
@@ -234,6 +234,27 @@
</property>
<property>
+ <name>mapred.local.dir.minspacestart</name>
+ <value>0</value>
+ <description>If the space in mapred.local.dir drops under this,
+ do not ask for more tasks.
+ Value in bytes.
+ </description>
+</property>
+
+<property>
+ <name>mapred.local.dir.minspacekill</name>
+ <value>0</value>
+ <description>If the space in mapred.local.dir drops under this,
+ do not ask more tasks until all the current ones have finished and
+ cleaned up. Also, to save the rest of the tasks we have running,
+ kill one of them, to clean up some space. Start with the reduce tasks,
+ then go with the ones that have finished the least.
+ Value in bytes.
+ </description>
+</property>
+
+<property>
<name>mapred.map.tasks</name>
<value>2</value>
<description>The default number of map tasks per job. Typically set
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=417789&r1=417788&r2=417789&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jun 28 08:28:27 2006
@@ -65,6 +65,18 @@
int mapTotal = 0;
int reduceTotal = 0;
boolean justStarted = true;
+
+ //dir -> DF
+ Map localDirsDf = new HashMap();
+ long minSpaceStart = 0;
+ //must have this much space free to start new tasks
+ boolean acceptNewTasks = true;
+ long minSpaceKill = 0;
+ //if we run under this limit, kill one task
+ //and make sure we never receive any new jobs
+ //until all the old tasks have been cleaned up.
+ //this is if a machine is so full it's only good
+ //for serving map output to the other nodes
static Random r = new Random();
FileSystem fs = null;
@@ -119,7 +131,12 @@
this.runningTasks = new TreeMap();
this.mapTotal = 0;
this.reduceTotal = 0;
-
+ this.acceptNewTasks = true;
+
+ this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
+ this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
+
+
// port numbers
this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050);
@@ -331,11 +348,14 @@
// Check if we should create a new Task
//
try {
- if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) {
+ if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) && acceptNewTasks) {
checkLocalDirs(fConf.getLocalDirs());
- Task t = jobClient.pollForNewTask(taskTrackerName);
- if (t != null) {
- startNewTask(t);
+
+ if (enoughFreeSpace(minSpaceStart)) {
+ Task t = jobClient.pollForNewTask(taskTrackerName);
+ if (t != null) {
+ startNewTask(t);
+ }
}
}
} catch (DiskErrorException de ) {
@@ -403,12 +423,99 @@
LOG.info("Problem getting closed tasks: " +
StringUtils.stringifyException(ie));
}
+
+ //Check if we're dangerously low on disk space
+ // If so, kill jobs to free up space and make sure
+ // we don't accept any new tasks
+ // Try killing the reduce jobs first, since I believe they
+ // use up most space
+ // Then pick the one with least progress
+
+ if (!enoughFreeSpace(minSpaceKill)) {
+ acceptNewTasks=false;
+ //we give up! do not accept new tasks until
+ //all the ones running have finished and they're all cleared up
+ synchronized (this) {
+ TaskInProgress killMe = null;
+
+ for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
+ TaskInProgress tip = (TaskInProgress) it.next();
+ if ((tip.getRunState() == TaskStatus.RUNNING) &&
+ !tip.wasKilled) {
+
+ if (killMe == null) {
+ killMe = tip;
+
+ } else if (!tip.getTask().isMapTask()) {
+ //reduce task, give priority
+ if (killMe.getTask().isMapTask() ||
+ (tip.getTask().getProgress().get() <
+ killMe.getTask().getProgress().get())) {
+
+ killMe = tip;
+ }
+
+ } else if (killMe.getTask().isMapTask() &&
+ tip.getTask().getProgress().get() <
+ killMe.getTask().getProgress().get()) {
+ //map task, only add if the progress is lower
+
+ killMe = tip;
+ }
+ }
+ }
+
+ if (killMe!=null) {
+ String msg = "Tasktracker running out of space. Killing task.";
+ LOG.info(killMe.getTask().getTaskId() + ": " + msg);
+ killMe.reportDiagnosticInfo(msg);
+ try {
+ killMe.killAndCleanup(true);
+ } catch (IOException ie) {
+ LOG.info("Problem cleaning task up: " +
+ StringUtils.stringifyException(ie));
+ }
+ }
+ }
+ }
+
+
+ //we've cleaned up, resume normal operation
+ if (!acceptNewTasks && tasks.isEmpty()) {
+ acceptNewTasks=true;
+ }
}
return 0;
}
/**
+ * Check if all of the local directories have enough
+ * free space
+ *
+ * If not, do not try to get a new task assigned
+ * @return
+ * @throws IOException
+ */
+ private boolean enoughFreeSpace(long minSpace) throws IOException {
+ String[] localDirs = fConf.getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ DF df = null;
+ if (localDirsDf.containsKey(localDirs[i])) {
+ df = (DF) localDirsDf.get(localDirs[i]);
+ } else {
+ df = new DF(localDirs[i], fConf);
+ localDirsDf.put(localDirs[i], df);
+ }
+
+ if (df.getAvailable() < minSpace)
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* Start a new task.
* All exceptions are handled locally, so that we don't mess up the
* task tracker.