You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/03 00:05:09 UTC
svn commit: r399065 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/JobTracker.java
Author: cutting
Date: Tue May 2 15:05:08 2006
New Revision: 399065
URL: http://svn.apache.org/viewcvs?rev=399065&view=rev
Log:
HADOOP-185. Fix so that, if a task tracker times out making the RPC asking for a new task to run, the job tracker does not think that it is actually running the task returned (but never received). Contributed by Owen.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=399065&r1=399064&r2=399065&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue May 2 15:05:08 2006
@@ -151,6 +151,10 @@
files containing random data. The second sorts the output of the
first. (omalley via cutting)
+40. HADOOP-185. Fix so that, when a task tracker times out making the
+ RPC asking for a new task to run, the job tracker does not think
+ that it is actually running the task returned. (omalley via cutting)
+
Release 0.1.1 - 2006-04-08
1. Added CHANGES.txt, logging all significant changes to Hadoop. (cutting)
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=399065&r1=399064&r2=399065&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue May 2 15:05:08 2006
@@ -76,6 +76,93 @@
return tracker;
}
+ /**
+ * A thread to timeout tasks that have been assigned to task trackers,
+ * but that haven't reported back yet.
+ * Note that I included a stop() method, even though there is no place
+ * where JobTrackers are cleaned up.
+ * @author Owen O'Malley
+ */
+ private class ExpireLaunchingTasks implements Runnable {
+ private volatile boolean shouldRun = true;
+ /**
+ * This is a map of the tasks that have been assigned to task trackers,
+ * but that have not yet been seen in a status report.
+ * map: task-id (String) -> time-assigned (Long)
+ */
+ private Map launchingTasks = new LinkedHashMap();
+ private static final String errorMsg = "Error launching task";
+ private static final String errorHost = "n/a";
+
+ public void run() {
+ try {
+ while (shouldRun) {
+ // Every 3 minutes check for any tasks that are overdue
+ Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3);
+ long now = System.currentTimeMillis();
+ LOG.fine("Starting launching task sweep");
+ synchronized (launchingTasks) {
+ Iterator itr = launchingTasks.entrySet().iterator();
+ while (itr.hasNext()) {
+ Map.Entry pair = (Map.Entry) itr.next();
+ String taskId = (String) pair.getKey();
+ long age = now - ((Long) pair.getValue()).longValue();
+ LOG.fine(taskId + " is " + age + " ms old.");
+ if (age > TASKTRACKER_EXPIRY_INTERVAL) {
+ LOG.info("Launching task " + taskId + " timed out.");
+ TaskInProgress tip = null;
+ synchronized (JobTracker.this) {
+ tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+ }
+ if (tip != null) {
+ synchronized (tip) {
+ JobInProgress job = tip.getJob();
+ // record why the job failed, so that the user can
+ // see the problem
+ TaskStatus status =
+ new TaskStatus(taskId,
+ tip.isMapTask(),
+ 0.0f,
+ TaskStatus.FAILED,
+ errorMsg,
+ errorMsg,
+ errorHost);
+ tip.updateStatus(status);
+ job.failedTask(tip, taskId, errorHost);
+ }
+ }
+ itr.remove();
+ } else {
+ // the tasks are sorted by start time, so once we find
+ // one that we want to keep, we are done for this cycle.
+ break;
+ }
+ }
+ }
+ }
+ } catch (InterruptedException ie) {
+ // all done
+ }
+ }
+
+ public void addNewTask(String taskName) {
+ synchronized (launchingTasks) {
+ launchingTasks.put(taskName,
+ Long.valueOf(System.currentTimeMillis()));
+ }
+ }
+
+ public void removeTask(String taskName) {
+ synchronized (launchingTasks) {
+ launchingTasks.remove(taskName);
+ }
+ }
+
+ public void stop() {
+ shouldRun = false;
+ }
+ }
+
///////////////////////////////////////////////////////
// Used to expire TaskTrackers that have gone down
///////////////////////////////////////////////////////
@@ -277,7 +364,9 @@
ExpireTrackers expireTrackers = new ExpireTrackers();
RetireJobs retireJobs = new RetireJobs();
JobInitThread initJobs = new JobInitThread();
-
+ ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
+ Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks);
+
/**
* It might seem like a bug to maintain a TreeSet of status objects,
* which can be updated at any time. But that's not what happens! We
@@ -346,12 +435,12 @@
this.port = addr.getPort();
this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);
this.interTrackerServer.start();
- Properties p = System.getProperties();
- for (Iterator it = p.keySet().iterator(); it.hasNext(); ) {
- String key = (String) it.next();
- String val = (String) p.getProperty(key);
- LOG.info("Property '" + key + "' is " + val);
- }
+ Properties p = System.getProperties();
+ for (Iterator it = p.keySet().iterator(); it.hasNext(); ) {
+ String key = (String) it.next();
+ String val = (String) p.getProperty(key);
+ LOG.info("Property '" + key + "' is " + val);
+ }
this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
this.infoServer = new JobTrackerInfoServer(this, infoPort);
@@ -362,6 +451,7 @@
new Thread(this.expireTrackers).start();
new Thread(this.retireJobs).start();
new Thread(this.initJobs).start();
+ expireLaunchingTaskThread.start();
}
public static InetSocketAddress getAddress(Configuration conf) {
@@ -622,7 +712,8 @@
Task t = job.obtainNewMapTask(taskTracker, tts);
if (t != null) {
- return t;
+ expireLaunchingTasks.addNewTask(t.getTaskId());
+ return t;
}
//
@@ -656,7 +747,8 @@
Task t = job.obtainNewReduceTask(taskTracker, tts);
if (t != null) {
- return t;
+ expireLaunchingTasks.addNewTask(t.getTaskId());
+ return t;
}
//
@@ -878,10 +970,12 @@
for (Iterator it = status.taskReports(); it.hasNext(); ) {
TaskStatus report = (TaskStatus) it.next();
report.setHostname(status.getHost());
- TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(report.getTaskId());
+ String taskId = report.getTaskId();
+ TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
if (tip == null) {
LOG.info("Serious problem. While updating status, cannot find taskid " + report.getTaskId());
} else {
+ expireLaunchingTasks.removeTask(taskId);
JobInProgress job = tip.getJob();
job.updateTaskStatus(tip, report);