You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/08/09 00:24:02 UTC
svn commit: r429858 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/
Author: cutting
Date: Tue Aug 8 15:24:01 2006
New Revision: 429858
URL: http://svn.apache.org/viewvc?rev=429858&view=rev
Log:
HADOOP-427. New bug number for this patch.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=429858&r1=429857&r2=429858&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Aug 8 15:24:01 2006
@@ -3,7 +3,7 @@
Trunk (unreleased changes)
- 1. HADOOP-415. Replace some uses of DatanodeDescriptor in the DFS
+ 1. HADOOP-427. Replace some uses of DatanodeDescriptor in the DFS
web UI code with DatanodeInfo, the preferred public class.
(Devaraj Das via cutting)
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=429858&r1=429857&r2=429858&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Aug 8 15:24:01 2006
@@ -280,7 +280,7 @@
} else if (status.getRunState() == TaskStatus.FAILED) {
// Tell the job to fail the relevant task
failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
- wasRunning, wasComplete);
+ wasRunning, wasComplete, metrics);
}
}
@@ -520,7 +520,7 @@
}
}
}
-
+
//
// If all tasks are complete, then the job is done!
//
@@ -571,7 +571,8 @@
*/
private void failedTask(TaskInProgress tip, String taskid,
TaskStatus status, String trackerName,
- boolean wasRunning, boolean wasComplete) {
+ boolean wasRunning, boolean wasComplete,
+ JobTrackerMetrics metrics) {
tip.failedSubTask(taskid, trackerName);
boolean isRunning = tip.isRunning();
boolean isComplete = tip.isComplete();
@@ -596,8 +597,10 @@
// the failed task goes to the end of the list.
if (tip.isMapTask()) {
firstMapToTry = (tip.getIdWithinJob() + 1) % maps.length;
+ metrics.failedMap();
} else {
firstReduceToTry = (tip.getIdWithinJob() + 1) % reduces.length;
+ metrics.failedReduce();
}
//
@@ -695,4 +698,5 @@
}
return null;
}
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=429858&r1=429857&r2=429858&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Aug 8 15:24:01 2006
@@ -219,6 +219,20 @@
// tracker has already been destroyed.
if (newProfile != null) {
if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
+ // But save the state so that if at a later
+ // point of time, we happen to hear from the
+ // same TaskTracker, we can reinstate
+ // the state
+ ExpiredTaskTrackerState
+ expTaskTrackerState =
+ new ExpiredTaskTrackerState(
+ leastRecent.getTrackerName());
+ if (LOG.isDebugEnabled())
+ LOG.debug("Saving state of TaskTracker " +
+ leastRecent.getTrackerName());
+ expiredTaskTrackerStates.put(
+ leastRecent.getTrackerName(),
+ expTaskTrackerState);
// Remove completely
updateTaskTrackerStatus(trackerName, null);
lostTaskTracker(leastRecent.getTrackerName(),
@@ -347,6 +361,11 @@
Metrics.report(metricsRecord, "maps-completed",
++numMapTasksCompleted);
}
+
+ synchronized void failedMap() {
+ Metrics.report(metricsRecord, "maps-completed",
+ --numMapTasksCompleted);
+ }
synchronized void launchReduce() {
Metrics.report(metricsRecord, "reduces-launched",
@@ -357,6 +376,11 @@
Metrics.report(metricsRecord, "reduces-completed",
++numReduceTasksCompleted);
}
+
+ synchronized void failedReduce() {
+ Metrics.report(metricsRecord, "reduces-completed",
+ --numReduceTasksCompleted);
+ }
synchronized void submitJob() {
Metrics.report(metricsRecord, "jobs-submitted",
@@ -427,6 +451,7 @@
Thread initJobsThread = null;
ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks);
+ private TreeMap expiredTaskTrackerStates = new TreeMap();
/**
* It might seem like a bug to maintain a TreeSet of status objects,
@@ -599,6 +624,36 @@
LOG.info("stopped all jobtracker services");
return;
}
+
+ boolean reinstateStateOfTaskTracker(String trackerName) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Going to reinstate state of tasktracker " + trackerName);
+ ExpiredTaskTrackerState e = (ExpiredTaskTrackerState)
+ expiredTaskTrackerStates.get(trackerName);
+ if (e == null) return false;
+ Set taskset = e.getTaskSet();
+ if (taskset == null) return true;
+ for (Iterator it = taskset.iterator(); it.hasNext(); ) {
+ String taskId = (String) it.next();
+ TaskInProgress tip = e.getTIP(taskId);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Going to recreate task entry for task " + taskId);
+ //check whether the job is still running
+ if (tip != null &&
+ tip.getJob().getStatus().getRunState() == JobStatus.RUNNING)
+ createTaskEntry(taskId, trackerName, tip);
+ }
+ ArrayList completedTasks = e.getCompletedTasks();
+ for (int i = 0; i < completedTasks.size(); i++) {
+ TaskStatus ts = (TaskStatus)completedTasks.get(i);
+ TaskInProgress tip = (TaskInProgress)taskidToTIPMap.get(ts.getTaskId());
+ if (tip == null) continue;
+ JobInProgress j = tip.getJob();
+ if (j != null && j.getStatus().getRunState() == JobStatus.RUNNING)
+ j.updateTaskStatus(tip, ts, myMetrics);
+ }
+ return true;
+ }
///////////////////////////////////////////////////////
// Maintain lookup tables; called by JobInProgress
@@ -748,7 +803,11 @@
} else {
// If not first contact, there should be some record of the tracker
if (!seenBefore) {
- return InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+ if (!reinstateStateOfTaskTracker(trackerName))
+ return InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+ else
+ trackerExpiryQueue.add(trackerStatus);
+
}
}
@@ -1196,5 +1255,68 @@
Configuration conf=new Configuration();
startTracker(conf);
+ }
+
+ private class ExpiredTaskTrackerState {
+ //Map from taskId (assigned to a given tasktracker) to the taskId's TIP
+ private TreeMap trackerTaskIdToTIPMap = new TreeMap();
+ //completedTasks is an array list that contains the list of tasks that a
+ //tasktracker successfully completed
+ ArrayList completedTasks = new ArrayList();
+
+ public ExpiredTaskTrackerState(String trackerId) {
+ trackerTaskIdToTIPMap.clear();
+ completedTasks.clear();
+ TreeSet tasks = (TreeSet) trackerToTaskMap.get(trackerId);
+ if (tasks == null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("This tasktracker has no tasks");
+ return;
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("Task IDs that this tasktracker has: ");
+ //We save the status of completed tasks only since TaskTrackers don't
+ //send updates about completed tasks. We don't need to save the status
+ //of other tasks since the TaskTracker will send the update along
+ //with the heartbeat (whenever that happens).
+ //Saving the status of completed tasks is required since the JobTracker
+ //will mark all tasks that belonged to a given TaskTracker as failed
+ //if that TaskTracker is lost. Now, if that same TaskTracker reports
+ //in later on, we can simply re-mark the completed tasks (TIPs really)
+ //it reported earlier about as "completed" and avoid unnecessary
+ //re-run of those tasks.
+ for (Iterator it = tasks.iterator(); it.hasNext(); ) {
+ String taskId = (String) it.next();
+ if (LOG.isDebugEnabled())
+ LOG.debug(taskId);
+ TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+ if (tip !=null &&
+ tip.getJob().getStatus().getRunState() == JobStatus.RUNNING)
+ trackerTaskIdToTIPMap.put(taskId, tip);
+ else continue;
+ TaskStatus ts = tip.getTaskStatus(taskId);
+ //ts could be null for a recently assigned task, in the case where,
+ //the tasktracker hasn't yet reported status about that task
+ if (ts == null) continue;
+ if (tip.isComplete()) {
+ TaskStatus saveTS = null;
+ try {
+ saveTS = (TaskStatus)ts.clone();
+ } catch (Exception e) {
+ LOG.fatal("Could not save TaskTracker state",e);
+ }
+ completedTasks.add(saveTS);
+ }
+ }
+ }
+ public Set getTaskSet() {
+ return trackerTaskIdToTIPMap.keySet();
+ }
+ public TaskInProgress getTIP(String taskId) {
+ return (TaskInProgress)trackerTaskIdToTIPMap.get(taskId);
+ }
+ public ArrayList getCompletedTasks() {
+ return completedTasks;
+ }
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=429858&r1=429857&r2=429858&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Aug 8 15:24:01 2006
@@ -283,6 +283,12 @@
taskStatuses.put(taskid, status);
+ //since if this task was declared failed due to tasktracker getting
+ //lost, but now that same tasktracker reports in with this taskId as
+ //running, we update recentTasks
+ if (status.getRunState() == TaskStatus.RUNNING)
+ recentTasks.add(taskid);
+
// Recompute progress
recomputeProgress();
return changed;
@@ -469,5 +475,12 @@
*/
public int getIdWithinJob() {
return partition;
+ }
+
+ /**
+ * Get the TaskStatus associated with a given taskId
+ */
+ public TaskStatus getTaskStatus(String taskId) {
+ return (TaskStatus)taskStatuses.get(taskId);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=429858&r1=429857&r2=429858&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Aug 8 15:24:01 2006
@@ -25,7 +25,7 @@
*
* @author Mike Cafarella
**************************************************/
-class TaskStatus implements Writable {
+class TaskStatus implements Writable, Cloneable {
public static final int RUNNING = 0;
public static final int SUCCEEDED = 1;
public static final int FAILED = 2;
@@ -51,6 +51,16 @@
this.diagnosticInfo = diagnosticInfo;
this.stateString = stateString;
this.taskTracker = taskTracker;
+ }
+
+ //Implementing the clone method so that we can save the status of tasks
+ public Object clone() throws CloneNotSupportedException {
+ TaskStatus ts = (TaskStatus)super.clone();
+ if (this.diagnosticInfo != null)
+ ts.diagnosticInfo = new String(this.diagnosticInfo);
+ if (this.stateString != null)
+ ts.stateString = new String(this.stateString);
+ return ts;
}
public String getTaskId() { return taskid; }