You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/24 23:18:35 UTC
svn commit: r467488 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/util/
Author: cutting
Date: Tue Oct 24 14:18:34 2006
New Revision: 467488
URL: http://svn.apache.org/viewvc?view=rev&rev=467488
Log:
HADOOP-610. Fix TaskTracker to survive more exceptions, keeping tasks from becoming lost.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 24 14:18:34 2006
@@ -31,6 +31,9 @@
locally, if possible. This was the intent, but there as a bug.
(Dhruba Borthakur via cutting)
+ 9. HADOOP-610. Fix TaskTracker to survive more exceptions, keeping
+ tasks from becoming lost. (omalley via cutting)
+
Release 0.7.2 - 2006-10-18
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Tue Oct 24 14:18:34 2006
@@ -47,6 +47,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.SocketChannelOutputStream;
+import org.apache.hadoop.util.*;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -497,6 +498,7 @@
// throw the message away if it is too old
if (System.currentTimeMillis() - call.receivedTime >
maxCallStartAge) {
+ ReflectionUtils.logThreadInfo(LOG, "Discarding call " + call, 30);
LOG.warn("Call " + call.toString() +
" discarded for being too old (" +
(System.currentTimeMillis() - call.receivedTime) + ")");
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Tue Oct 24 14:18:34 2006
@@ -53,7 +53,7 @@
}
public void progress(String taskid, float progress, String state,
- Phase phase) throws IOException {
+ TaskStatus.Phase phase) throws IOException {
StringBuffer buf = new StringBuffer("Task ");
buf.append(taskid);
buf.append(" making progress to ");
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Oct 24 14:18:34 2006
@@ -657,7 +657,7 @@
* @param trackerName The task tracker the task failed on
*/
public void failedTask(TaskInProgress tip, String taskid,
- String reason, Phase phase,
+ String reason, TaskStatus.Phase phase,
String hostname, String trackerName,
JobTrackerMetrics metrics) {
TaskStatus status = new TaskStatus(taskid,
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Oct 24 14:18:34 2006
@@ -69,7 +69,8 @@
tracker = new JobTracker(conf);
break;
} catch (IOException e) {
- LOG.warn("Starting tracker", e);
+ LOG.warn("Error starting tracker: " +
+ StringUtils.stringifyException(e));
}
try {
Thread.sleep(1000);
@@ -139,7 +140,8 @@
TaskTrackerStatus trackerStatus =
getTaskTracker(trackerName);
job.failedTask(tip, taskId, "Error launching task",
- tip.isMapTask()?Phase.MAP:Phase.STARTING,
+ tip.isMapTask()? TaskStatus.Phase.MAP:
+ TaskStatus.Phase.STARTING,
trackerStatus.getHost(), trackerName,
myMetrics);
}
@@ -1209,7 +1211,8 @@
// if the job is done, we don't want to change anything
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
job.failedTask(tip, taskId, "Lost task tracker",
- Phase.MAP, hostname, trackerName, myMetrics);
+ TaskStatus.Phase.MAP, hostname, trackerName,
+ myMetrics);
}
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Oct 24 14:18:34 2006
@@ -157,7 +157,8 @@
public Task getTask(String taskid) { return null; }
- public void progress(String taskId, float progress, String state, Phase phase) {
+ public void progress(String taskId, float progress, String state,
+ TaskStatus.Phase phase) {
LOG.info(state);
float taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Tue Oct 24 14:18:34 2006
@@ -46,7 +46,7 @@
}
{ // set phase for this task
- setPhase(Phase.MAP);
+ setPhase(TaskStatus.Phase.MAP);
}
private class MapTaskMetrics {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Oct 24 14:18:34 2006
@@ -67,7 +67,7 @@
{
getProgress().setStatus("reduce");
- setPhase(Phase.SHUFFLE); // phase to start with
+ setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
}
private Progress copyPhase = getProgress().addPhase("copy");
@@ -236,7 +236,7 @@
WritableComparator comparator = job.getOutputKeyComparator();
try {
- setPhase(Phase.SORT) ;
+ setPhase(TaskStatus.Phase.SORT) ;
sortProgress.start();
// sort the input file
@@ -249,7 +249,7 @@
}
sortPhase.complete(); // sort is complete
- setPhase(Phase.REDUCE);
+ setPhase(TaskStatus.Phase.REDUCE);
Reporter reporter = getReporter(umbilical, getProgress());
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Tue Oct 24 14:18:34 2006
@@ -37,7 +37,7 @@
private String taskId; // unique, includes job id
private String jobId; // unique jobid
private int partition; // id within job
- private Phase phase ; // current phase of the task
+ private TaskStatus.Phase phase ; // current phase of the task
////////////////////////////////////////////
// Constructors
@@ -78,14 +78,14 @@
* Return current phase of the task.
* @return
*/
- public Phase getPhase(){
+ public TaskStatus.Phase getPhase(){
return this.phase ;
}
/**
* Set current phase of the task.
* @param p
*/
- protected void setPhase(Phase p){
+ protected void setPhase(TaskStatus.Phase p){
this.phase = p ;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Oct 24 14:18:34 2006
@@ -18,9 +18,6 @@
import org.apache.hadoop.io.*;
import java.io.*;
-// enumeration for reporting current phase of a task.
-enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE}
-
/**************************************************
* Describes the current status of a task. This is
* not intended to be a comprehensive piece of data.
@@ -28,7 +25,11 @@
* @author Mike Cafarella
**************************************************/
class TaskStatus implements Writable {
- public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED};
+ //enumeration for reporting current phase of a task.
+ public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE}
+
+ // what state is the task in?
+ public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED}
private String taskid;
private boolean isMap;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Oct 24 14:18:34 2006
@@ -50,7 +50,7 @@
private long taskTimeout;
private int httpPort;
- static final int STALE_STATE = 1;
+ static enum State {NORMAL, STALE, INTERRUPTED}
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.mapred.TaskTracker");
@@ -71,12 +71,12 @@
boolean shuttingDown = false;
- TreeMap tasks = null;
+ Map<String, TaskInProgress> tasks = null;
/**
* Map from taskId -> TaskInProgress.
*/
- TreeMap runningTasks = null;
- Map runningJobs = null;
+ Map<String, TaskInProgress> runningTasks = null;
+ Map<String, RunningJob> runningJobs = null;
int mapTotal = 0;
int reduceTotal = 0;
boolean justStarted = true;
@@ -151,6 +151,41 @@
taskCleanupThread.start();
}
+ private RunningJob addTaskToJob(String jobId,
+ Path localJobFile,
+ TaskInProgress tip) {
+ synchronized (runningJobs) {
+ RunningJob rJob = null;
+ if (!runningJobs.containsKey(jobId)) {
+ rJob = new RunningJob(localJobFile);
+ rJob.localized = false;
+ rJob.tasks = new HashSet();
+ rJob.jobFile = localJobFile;
+ runningJobs.put(jobId, rJob);
+ } else {
+ rJob = runningJobs.get(jobId);
+ }
+ rJob.tasks.add(tip);
+ return rJob;
+ }
+ }
+
+ private void removeTaskFromJob(String jobId, TaskInProgress tip) {
+ synchronized (runningJobs) {
+ RunningJob rjob = runningJobs.get(jobId);
+ if (rjob == null) {
+ LOG.warn("Unknown job " + jobId + " being deleted.");
+ } else {
+ synchronized (rjob) {
+ rjob.tasks.remove(tip);
+ if (rjob.tasks.isEmpty()) {
+ runningJobs.remove(jobId);
+ }
+ }
+ }
+ }
+ }
+
static String getCacheSubdir() {
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
}
@@ -229,30 +264,14 @@
private void localizeJob(TaskInProgress tip) throws IOException {
Path localJarFile = null;
Task t = tip.getTask();
- Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t
- .getJobId()
- + Path.SEPARATOR + "job.xml"));
- RunningJob rjob = null;
- synchronized (runningJobs) {
- if (!runningJobs.containsKey(t.getJobId())) {
- rjob = new RunningJob();
- rjob.localized = false;
- rjob.tasks = new ArrayList();
- rjob.jobFile = localJobFile;
- rjob.tasks.add(tip);
- runningJobs.put(t.getJobId(), rjob);
- } else {
- rjob = (RunningJob) runningJobs.get(t.getJobId());
- // keep this for later use when we just get a jobid to delete
- // the data for
- rjob.tasks.add(tip);
- }
- }
+ String jobId = t.getJobId();
+ Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()),
+ jobId + Path.SEPARATOR + "job.xml");
+ RunningJob rjob = addTaskToJob(jobId, localJobFile, tip);
synchronized (rjob) {
if (!rjob.localized) {
- localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t
- .getJobId())
- + Path.SEPARATOR + "job.jar");
+ localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()),
+ jobId + Path.SEPARATOR + "job.jar");
String jobFile = t.getJobFile();
fs.copyToLocalFile(new Path(jobFile), localJobFile);
@@ -385,28 +404,7 @@
public InterTrackerProtocol getJobClient() {
return jobClient;
}
-
- /**
- * Are we running under killall-less operating system.
- */
- private static boolean isWindows =
- System.getProperty("os.name").startsWith("Windows");
-
- /**
- * Get the call stacks for all java processes on this system.
- * Obviously, this is only useful for debugging.
- */
- private static void getCallStacks() {
- if (LOG.isDebugEnabled() && !isWindows) {
- try {
- Process proc =
- Runtime.getRuntime().exec("killall -QUIT java");
- proc.waitFor();
- } catch (IOException ie) {
- LOG.warn(StringUtils.stringifyException(ie));
- } catch (InterruptedException ie) {}
- }
- }
+
/**Return the DFS filesystem
* @return
*/
@@ -417,220 +415,227 @@
/**
* Main service loop. Will stay in this loop forever.
*/
- int offerService() throws Exception {
+ State offerService() throws Exception {
long lastHeartbeat = 0;
this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
while (running && !shuttingDown) {
+ try {
long now = System.currentTimeMillis();
long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
if (waitTime > 0) {
- try {
- // sleeps for the wait time, wakes up if a task is finished.
- synchronized(finishedCount) {
- if (finishedCount[0] == 0) {
- finishedCount.wait(waitTime);
- }
- finishedCount[0] = 0;
- }
- } catch (InterruptedException ie) {
- }
- }
- lastHeartbeat = now;
-
- //
- // Emit standard hearbeat message to check in with JobTracker
- //
- Vector taskReports = new Vector();
- synchronized (this) {
- for (Iterator it = runningTasks.values().iterator();
- it.hasNext(); ) {
- TaskInProgress tip = (TaskInProgress) it.next();
- TaskStatus status = tip.createStatus();
- taskReports.add(status);
+ // sleeps for the wait time, wakes up if a task is finished.
+ synchronized(finishedCount) {
+ if (finishedCount[0] == 0) {
+ finishedCount.wait(waitTime);
}
- }
-
- //
- // Xmit the heartbeat
- //
-
- TaskTrackerStatus status =
- new TaskTrackerStatus(taskTrackerName, localHostname,
- httpPort, taskReports,
- failures);
- int resultCode = jobClient.emitHeartbeat(status, justStarted);
- synchronized (this) {
- for (Iterator it = taskReports.iterator();
- it.hasNext(); ) {
- TaskStatus taskStatus = (TaskStatus) it.next();
- if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
- if (taskStatus.getIsMap()) {
- mapTotal--;
- } else {
- reduceTotal--;
- }
- myMetrics.completeTask();
- runningTasks.remove(taskStatus.getTaskId());
- }
+ finishedCount[0] = 0;
}
}
- justStarted = false;
-
- if (resultCode == InterTrackerProtocol.UNKNOWN_TASKTRACKER) {
- return STALE_STATE;
- }
- //
- // Check if we should createRecord a new Task
- //
- try {
- if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) && acceptNewTasks) {
- checkLocalDirs(fConf.getLocalDirs());
-
- if (enoughFreeSpace(minSpaceStart)) {
- Task t = jobClient.pollForNewTask(taskTrackerName);
- if (t != null) {
- startNewTask(t);
- }
- }
- }
- } catch (DiskErrorException de ) {
- LOG.warn("Exiting task tracker because "+de.getMessage());
- jobClient.reportTaskTrackerError(taskTrackerName,
- "DiskErrorException", de.getMessage());
- return STALE_STATE;
- } catch (IOException ie) {
- LOG.info("Problem launching task: " +
- StringUtils.stringifyException(ie));
+ if (!transmitHeartBeat()) {
+ return State.STALE;
}
+ lastHeartbeat = now;
+ justStarted = false;
- //
- // Kill any tasks that have not reported progress in the last X seconds.
- //
- synchronized (this) {
- for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
- TaskInProgress tip = (TaskInProgress) it.next();
- long timeSinceLastReport = System.currentTimeMillis() -
- tip.getLastProgressReport();
- if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
- (timeSinceLastReport > this.taskTimeout) &&
- !tip.wasKilled) {
- String msg = "Task failed to report status for " +
- (timeSinceLastReport / 1000) +
- " seconds. Killing.";
- LOG.info(tip.getTask().getTaskId() + ": " + msg);
- getCallStacks();
- tip.reportDiagnosticInfo(msg);
- try {
- tip.killAndCleanup(true);
- } catch (IOException ie) {
- LOG.info("Problem cleaning task up: " +
- StringUtils.stringifyException(ie));
- }
- }
- }
+ checkForNewTasks();
+ markUnresponsiveTasks();
+ closeCompletedTasks();
+ killOverflowingTasks();
+
+ //we've cleaned up, resume normal operation
+ if (!acceptNewTasks && tasks.isEmpty()) {
+ acceptNewTasks=true;
}
+ } catch (InterruptedException ie) {
+ LOG.info("Interrupted. Closing down.");
+ return State.INTERRUPTED;
+ } catch (DiskErrorException de) {
+ String msg = "Exiting task tracker for disk error:\n" +
+ StringUtils.stringifyException(de);
+ LOG.error(msg);
+ jobClient.reportTaskTrackerError(taskTrackerName,
+ "DiskErrorException", msg);
+ return State.STALE;
+ } catch (Exception except) {
+ String msg = "Caught exception: " +
+ StringUtils.stringifyException(except);
+ LOG.error(msg);
+ }
+ }
- //
- // Check for any Tasks that should be killed, even if
- // the containing Job is still ongoing. (This happens
- // with speculative execution, when one version of the
- // task finished before another
- //
+ return State.NORMAL;
+ }
- //
- // Check for any Tasks whose job may have ended
- //
- try {
- String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName);
- if (toCloseIds != null) {
- synchronized (this) {
- for (int i = 0; i < toCloseIds.length; i++) {
- Object tip = tasks.get(toCloseIds[i]);
- synchronized(runningJobs){
- runningJobs.remove(((TaskInProgress)
- tasks.get(toCloseIds[i])).getTask().getJobId());
- }
- if (tip != null) {
- tasksToCleanup.put(tip);
- } else {
- LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
- }
+ /**
+ * Build and transmit the heart beat to the JobTracker
+ * @return false if the tracker was unknown
+ * @throws IOException
+ */
+ private boolean transmitHeartBeat() throws IOException {
+ //
+ // Build the heartbeat information for the JobTracker
+ //
+ List<TaskStatus> taskReports = new ArrayList(runningTasks.size());
+ synchronized (this) {
+ for (TaskInProgress tip: runningTasks.values()) {
+ taskReports.add(tip.createStatus());
+ }
+ }
+ TaskTrackerStatus status =
+ new TaskTrackerStatus(taskTrackerName, localHostname,
+ httpPort, taskReports,
+ failures);
+
+ //
+ // Xmit the heartbeat
+ //
+
+ int resultCode = jobClient.emitHeartbeat(status, justStarted);
+ synchronized (this) {
+ for (TaskStatus taskStatus: taskReports) {
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ if (taskStatus.getIsMap()) {
+ mapTotal--;
+ } else {
+ reduceTotal--;
}
- }
- }
- } catch (IOException ie) {
- LOG.info("Problem getting closed tasks: " +
- StringUtils.stringifyException(ie));
+ myMetrics.completeTask();
+ runningTasks.remove(taskStatus.getTaskId());
}
-
- //Check if we're dangerously low on disk space
- // If so, kill jobs to free up space and make sure
- // we don't accept any new tasks
- // Try killing the reduce jobs first, since I believe they
- // use up most space
- // Then pick the one with least progress
-
- if (!enoughFreeSpace(minSpaceKill)) {
- acceptNewTasks=false;
- //we give up! do not accept new tasks until
- //all the ones running have finished and they're all cleared up
- synchronized (this) {
- TaskInProgress killMe = null;
-
- for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
- TaskInProgress tip = (TaskInProgress) it.next();
- if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
- !tip.wasKilled) {
-
- if (killMe == null) {
- killMe = tip;
-
- } else if (!tip.getTask().isMapTask()) {
- //reduce task, give priority
- if (killMe.getTask().isMapTask() ||
- (tip.getTask().getProgress().get() <
- killMe.getTask().getProgress().get())) {
-
- killMe = tip;
- }
-
- } else if (killMe.getTask().isMapTask() &&
- tip.getTask().getProgress().get() <
- killMe.getTask().getProgress().get()) {
- //map task, only add if the progress is lower
-
- killMe = tip;
- }
- }
- }
+ }
+ }
+ return resultCode != InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+ }
- if (killMe!=null) {
- String msg = "Tasktracker running out of space. Killing task.";
- LOG.info(killMe.getTask().getTaskId() + ": " + msg);
- killMe.reportDiagnosticInfo(msg);
- try {
- killMe.killAndCleanup(true);
- } catch (IOException ie) {
- LOG.info("Problem cleaning task up: " +
- StringUtils.stringifyException(ie));
- }
- }
- }
+ /**
+ * Check to see if there are any new tasks that we should run.
+ * @throws IOException
+ */
+ private void checkForNewTasks() throws IOException {
+ //
+ // Check if we should ask for a new Task
+ //
+ if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
+ acceptNewTasks) {
+ checkLocalDirs(fConf.getLocalDirs());
+
+ if (enoughFreeSpace(minSpaceStart)) {
+ Task t = jobClient.pollForNewTask(taskTrackerName);
+ if (t != null) {
+ startNewTask(t);
+ }
+ }
+ }
+ }
+
+ /**
+ * Kill any tasks that have not reported progress in the last X seconds.
+ */
+ private synchronized void markUnresponsiveTasks() throws IOException {
+ long now = System.currentTimeMillis();
+ for (TaskInProgress tip: runningTasks.values()) {
+ long timeSinceLastReport = now - tip.getLastProgressReport();
+ if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
+ (timeSinceLastReport > this.taskTimeout) &&
+ !tip.wasKilled) {
+ String msg = "Task failed to report status for " +
+ (timeSinceLastReport / 1000) +
+ " seconds. Killing.";
+ LOG.info(tip.getTask().getTaskId() + ": " + msg);
+ ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
+ tip.reportDiagnosticInfo(msg);
+ tasksToCleanup.put(tip);
}
+ }
+ }
-
- //we've cleaned up, resume normal operation
- if (!acceptNewTasks && tasks.isEmpty()) {
- acceptNewTasks=true;
+ /**
+ * Ask the JobTracker if there are any tasks that we should clean up,
+ * either because we don't need them any more or because the job is done.
+ */
+ private void closeCompletedTasks() throws IOException {
+ String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName);
+ if (toCloseIds != null) {
+ synchronized (this) {
+ for (int i = 0; i < toCloseIds.length; i++) {
+ TaskInProgress tip = tasks.get(toCloseIds[i]);
+ if (tip != null) {
+ // remove the task from running jobs, removing the job if
+ // it is the last task
+ removeTaskFromJob(tip.getTask().getJobId(), tip);
+ tasksToCleanup.put(tip);
+ } else {
+ LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
}
+ }
}
+ }
+ }
- return 0;
+ /** Check if we're dangerously low on disk space
+ * If so, kill jobs to free up space and make sure
+ * we don't accept any new tasks
+ * Try killing the reduce jobs first, since I believe they
+ * use up most space
+ * Then pick the one with least progress
+ */
+ private void killOverflowingTasks() throws IOException {
+ if (!enoughFreeSpace(minSpaceKill)) {
+ acceptNewTasks=false;
+ //we give up! do not accept new tasks until
+ //all the ones running have finished and they're all cleared up
+ synchronized (this) {
+ TaskInProgress killMe = findTaskToKill();
+
+ if (killMe!=null) {
+ String msg = "Tasktracker running out of space." +
+ " Killing task.";
+ LOG.info(killMe.getTask().getTaskId() + ": " + msg);
+ killMe.reportDiagnosticInfo(msg);
+ tasksToCleanup.put(killMe);
+ }
+ }
+ }
}
+
+ /**
+ * Pick a task to kill to free up space
+ * @return the task to kill or null, if one wasn't found
+ */
+ private TaskInProgress findTaskToKill() {
+ TaskInProgress killMe = null;
+ for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
+ TaskInProgress tip = (TaskInProgress) it.next();
+ if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
+ !tip.wasKilled) {
+
+ if (killMe == null) {
+ killMe = tip;
+
+ } else if (!tip.getTask().isMapTask()) {
+ //reduce task, give priority
+ if (killMe.getTask().isMapTask() ||
+ (tip.getTask().getProgress().get() <
+ killMe.getTask().getProgress().get())) {
+
+ killMe = tip;
+ }
+
+ } else if (killMe.getTask().isMapTask() &&
+ tip.getTask().getProgress().get() <
+ killMe.getTask().getProgress().get()) {
+ //map task, only add if the progress is lower
+ killMe = tip;
+ }
+ }
+ }
+ return killMe;
+ }
+
/**
* Check if all of the local directories have enough
* free space
@@ -640,6 +645,9 @@
* @throws IOException
*/
private boolean enoughFreeSpace(long minSpace) throws IOException {
+ if (minSpace == 0) {
+ return true;
+ }
String[] localDirs = fConf.getLocalDirs();
for (int i = 0; i < localDirs.length; i++) {
DF df = null;
@@ -704,7 +712,7 @@
// This while-loop attempts reconnects if we get network errors
while (running && ! staleState && !shuttingDown ) {
try {
- if (offerService() == STALE_STATE) {
+ if (offerService() == State.STALE) {
staleState = true;
}
} catch (Exception ex) {
@@ -722,11 +730,12 @@
close();
}
if (shuttingDown) { return; }
- LOG.info("Reinitializing local state");
+ LOG.warn("Reinitializing local state");
initialize();
}
} catch (IOException iex) {
- LOG.info("Got fatal exception while reinitializing TaskTracker: " + iex.toString());
+ LOG.error("Got fatal exception while reinitializing TaskTracker: " +
+ StringUtils.stringifyException(iex));
return;
}
}
@@ -811,7 +820,8 @@
progress, runstate,
diagnosticInfo.toString(),
"initializing",
- getName(), task.isMapTask()?Phase.MAP:Phase.SHUFFLE);
+ getName(), task.isMapTask()? TaskStatus.Phase.MAP:
+ TaskStatus.Phase.SHUFFLE);
keepJobFiles = false;
}
@@ -884,17 +894,18 @@
/**
* The task is reporting its progress
*/
- public synchronized void reportProgress(float p, String state, Phase newPhase) {
+ public synchronized void reportProgress(float p, String state,
+ TaskStatus.Phase newPhase) {
LOG.info(task.getTaskId()+" "+p+"% "+state);
this.progress = p;
this.runstate = TaskStatus.State.RUNNING;
this.lastProgressReport = System.currentTimeMillis();
- Phase oldPhase = taskStatus.getPhase() ;
+ TaskStatus.Phase oldPhase = taskStatus.getPhase() ;
if( oldPhase != newPhase ){
// sort phase started
- if( newPhase == Phase.SORT ){
+ if( newPhase == TaskStatus.Phase.SORT ){
this.taskStatus.setShuffleFinishTime(System.currentTimeMillis());
- }else if( newPhase == Phase.REDUCE){
+ }else if( newPhase == TaskStatus.Phase.REDUCE){
this.taskStatus.setSortFinishTime(System.currentTimeMillis());
}
}
@@ -1068,6 +1079,16 @@
JOBCACHE + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
taskId);
}
+
+ public boolean equals(Object obj) {
+ return (obj instanceof TaskInProgress) &&
+ task.getTaskId().equals
+ (((TaskInProgress) obj).getTask().getTaskId());
+ }
+
+ public int hashCode() {
+ return task.getTaskId().hashCode();
+ }
}
@@ -1089,7 +1110,10 @@
/**
* Called periodically to report Task progress, from 0.0 to 1.0.
*/
- public synchronized void progress(String taskid, float progress, String state, Phase phase) throws IOException {
+ public synchronized void progress(String taskid, float progress,
+ String state,
+ TaskStatus.Phase phase
+ ) throws IOException {
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
if (tip != null) {
tip.reportProgress(progress, state, phase);
@@ -1150,7 +1174,7 @@
tip.taskFinished();
synchronized(finishedCount) {
finishedCount[0]++;
- finishedCount.notifyAll();
+ finishedCount.notify();
}
} else {
LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
@@ -1176,8 +1200,14 @@
static class RunningJob{
Path jobFile;
// keep this for later use
- ArrayList tasks;
+ Set<TaskInProgress> tasks;
boolean localized;
+
+ RunningJob(Path jobFile) {
+ localized = false;
+ tasks = new HashSet();
+ this.jobFile = jobFile;
+ }
}
/**
@@ -1239,7 +1269,7 @@
LOG.info("Ping exception: " + msg);
remainingRetries -=1;
if (remainingRetries == 0) {
- getCallStacks();
+ ReflectionUtils.logThreadInfo(LOG, "ping exception", 0);
LOG.warn("Last retry, killing "+taskid);
System.exit(65);
}
@@ -1329,9 +1359,11 @@
System.out.println("usage: TaskTracker");
System.exit(-1);
}
-
try {
JobConf conf=new JobConf();
+ // enable the server to track time spent waiting on locks
+ ReflectionUtils.setContentionTracing
+ (conf.getBoolean("tasktracker.contention.tracking", false));
new TaskTracker(conf).run();
} catch (IOException e) {
LOG.warn( "Can not start task tracker because "+
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue Oct 24 14:18:34 2006
@@ -42,25 +42,26 @@
String host;
int httpPort;
int failures;
- Vector taskReports;
+ List<TaskStatus> taskReports;
volatile long lastSeen;
/**
*/
public TaskTrackerStatus() {
+ taskReports = new ArrayList();
}
/**
*/
public TaskTrackerStatus(String trackerName, String host,
- int httpPort, Vector taskReports, int failures) {
+ int httpPort, List<TaskStatus> taskReports,
+ int failures) {
this.trackerName = trackerName;
this.host = host;
this.httpPort = httpPort;
- this.taskReports = new Vector();
- this.taskReports.addAll(taskReports);
+ this.taskReports = new ArrayList(taskReports);
this.failures = failures;
}
@@ -167,7 +168,6 @@
this.host = UTF8.readString(in);
this.httpPort = in.readInt();
- taskReports = new Vector();
taskReports.clear();
int numTasks = in.readInt();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Oct 24 14:18:34 2006
@@ -37,7 +37,8 @@
* @param state description of task's current state
* @param phase current phase of the task.
*/
- void progress(String taskid, float progress, String state, Phase phase)
+ void progress(String taskid, float progress, String state,
+ TaskStatus.Phase phase)
throws IOException;
/** Report error messages back to parent. Calls should be sparing, since all
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java Tue Oct 24 14:18:34 2006
@@ -17,7 +17,11 @@
package org.apache.hadoop.util;
import java.lang.reflect.Constructor;
+import java.io.*;
+import java.lang.management.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
@@ -54,5 +58,87 @@
}
}
return result;
+ }
+
+ static private ThreadMXBean threadBean =
+ ManagementFactory.getThreadMXBean();
+
+ public static void setContentionTracing(boolean val) {
+ threadBean.setThreadContentionMonitoringEnabled(val);
+ }
+
+ private static String getTaskName(long id, String name) {
+ if (name == null) {
+ return Long.toString(id);
+ }
+ return id + " (" + name + ")";
+ }
+
+ /**
+ * Print all of the thread's information and stack traces
+ * @author Owen O'Malley
+ * @param stream the stream to
+ * @param title a string title for the stack trace
+ */
+ public static void printThreadInfo(PrintWriter stream,
+ String title) {
+ final int STACK_DEPTH = 20;
+ boolean contention = threadBean.isThreadContentionMonitoringEnabled();
+ long[] threadIds = threadBean.getAllThreadIds();
+ stream.println("Process Thread Dump: " + title);
+ stream.println(threadIds.length + " active threads");
+ for (long tid: threadIds) {
+ ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
+ if (info == null) {
+ stream.println(" Inactive");
+ continue;
+ }
+ stream.println("Thread " +
+ getTaskName(info.getThreadId(),
+ info.getThreadName()) + ":");
+ Thread.State state = info.getThreadState();
+ stream.println(" State: " + state);
+ stream.println(" Blocked count: " + info.getBlockedCount());
+ stream.println(" Waited count: " + info.getWaitedCount());
+ if (contention) {
+ stream.println(" Blocked time: " + info.getBlockedTime());
+ stream.println(" Waited time: " + info.getWaitedTime());
+ }
+ if (state == Thread.State.WAITING) {
+ stream.println(" Waiting on " + info.getLockName());
+ } else if (state == Thread.State.BLOCKED) {
+ stream.println(" Blocked on " + info.getLockName());
+ stream.println(" Blocked by " +
+ getTaskName(info.getLockOwnerId(),
+ info.getLockOwnerName()));
+ }
+ stream.println(" Stack:");
+ for (StackTraceElement frame: info.getStackTrace()) {
+ stream.println(" " + frame.toString());
+ }
+ }
+ stream.flush();
+ }
+
+ private static long previousLogTime = 0;
+
+ /**
+ * Log the current thread stacks at INFO level.
+ * @param log the logger that logs the stack trace
+ * @param title a descriptive title for the call stacks
+ * @param minInterval the minimum time from the last
+ */
+ public static synchronized void logThreadInfo(Log log,
+ String title,
+ long minInterval) {
+ if (log.isInfoEnabled()) {
+ long now = System.currentTimeMillis();
+ if (now - previousLogTime >= minInterval * 1000) {
+ previousLogTime = now;
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ printThreadInfo(new PrintWriter(buffer), title);
+ log.info(buffer.toString());
+ }
+ }
}
}