You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/03/06 07:42:31 UTC
svn commit: r750783 - in /hadoop/core/trunk: ./
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: yhemanth
Date: Fri Mar 6 06:42:30 2009
New Revision: 750783
URL: http://svn.apache.org/viewvc?rev=750783&view=rev
Log:
HADOOP-5338. Fix jobtracker restart to clear task completion events cached by tasktrackers to avoid missing events. Contributed by Amar Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=750783&r1=750782&r2=750783&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Mar 6 06:42:30 2009
@@ -946,6 +946,10 @@
HADOOP-5145. Balancer sometimes runs out of memory after running days or weeks.
(hairong)
+ HADOOP-5338. Fix jobtracker restart to clear task completion events cached by
+ tasktrackers forcing them to fetch all events afresh, thus avoiding missed
+ task completion events on the tasktrackers. (Amar Kamat via yhemanth)
+
Release 0.19.2 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java?rev=750783&r1=750782&r2=750783&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java Fri Mar 6 06:42:30 2009
@@ -21,8 +21,10 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Map;
import java.util.HashMap;
+import java.util.Set;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@@ -39,7 +41,7 @@
short responseId;
int heartbeatInterval;
TaskTrackerAction[] actions;
- Map<JobID, Integer> lastKnownIndexMap = null;
+ Set<JobID> recoveredJobs = new HashSet<JobID>();
HeartbeatResponse() {}
@@ -57,12 +59,12 @@
return responseId;
}
- public void setLastKnownIndices(Map<JobID, Integer> lastKnownIndexMap) {
- this.lastKnownIndexMap = lastKnownIndexMap;
+ public void setRecoveredJobs(Set<JobID> ids) {
+ recoveredJobs = ids;
}
- public Map<JobID, Integer> getLastKnownIndex() {
- return lastKnownIndexMap;
+ public Set<JobID> getRecoveredJobs() {
+ return recoveredJobs;
}
public void setActions(TaskTrackerAction[] actions) {
@@ -101,17 +103,11 @@
action.write(out);
}
}
- // Write the last map event index for the jobs
- if (lastKnownIndexMap != null) {
- out.writeInt(lastKnownIndexMap.size());
- for (Map.Entry<JobID, Integer> entry : lastKnownIndexMap.entrySet()) {
- entry.getKey().write(out);
- out.writeInt(entry.getValue());
- }
- } else {
- out.writeInt(0);
+ // Write the job ids of the jobs that were recovered
+ out.writeInt(recoveredJobs.size());
+ for (JobID id : recoveredJobs) {
+ id.write(out);
}
- //ObjectWritable.writeObject(out, actions, actions.getClass(), conf);
}
public void readFields(DataInput in) throws IOException {
@@ -129,17 +125,12 @@
} else {
actions = null;
}
- // Read the last map events index of the jobs
+ // Read the job ids of the jobs that were recovered
int size = in.readInt();
- if (size != 0) {
- lastKnownIndexMap = new HashMap<JobID, Integer>(size);
- for (int i = 0; i < size; ++i) {
- JobID id = new JobID();
- id.readFields(in);
- int count = in.readInt();
- lastKnownIndexMap.put(id, count);
- }
+ for (int i = 0; i < size; ++i) {
+ JobID id = new JobID();
+ id.readFields(in);
+ recoveredJobs.add(id);
}
- //actions = (TaskTrackerAction[]) ObjectWritable.readObject(in, conf);
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=750783&r1=750782&r2=750783&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Mar 6 06:42:30 2009
@@ -60,8 +60,9 @@
* Version 23: Added parameter 'initialContact' again in heartbeat method
* (HADOOP-4869)
* Version 24: Changed format of Task and TaskStatus for HADOOP-4759
+ * Version 25: JobIDs are passed in response to JobTracker restart
*/
- public static final long versionID = 24L;
+ public static final long versionID = 25L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=750783&r1=750782&r2=750783&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 6 06:42:30 2009
@@ -848,6 +848,10 @@
return jobsToRecover.size() != 0;
}
+ Set<JobID> getJobsToRecover() {
+ return jobsToRecover;
+ }
+
/** Check if the given string represents a job-id or not
*/
private boolean isJobNameValid(String str) {
@@ -1127,7 +1131,9 @@
long recoveryStartTime = System.currentTimeMillis();
// II. Recover each job
- for (JobID id : jobsToRecover) {
+ idIter = jobsToRecover.iterator();
+ while (idIter.hasNext()) {
+ JobID id = idIter.next();
JobInProgress pJob = getJob(id);
// 1. Get the required info
@@ -1169,11 +1175,9 @@
+ id + ". Ignoring it.", ioe);
}
- // 6. Inform the jobtracker as to how much of the data is recovered.
- // This is done so that TT should rollback to account for lost
- // updates
- lastSeenEventMapOnRestart.put(pJob.getStatus().getJobID(),
- pJob.getNumTaskCompletionEvents());
+ if (pJob.isComplete()) {
+ idIter.remove(); // no need to keep this job info as its successful
+ }
}
recoveryDuration = System.currentTimeMillis() - recoveryStartTime;
@@ -1198,8 +1202,6 @@
trackerExpiryQueue.add(status);
}
- // IV. Cleanup
- jobsToRecover.clear();
LOG.info("Restoration complete");
}
@@ -1277,10 +1279,6 @@
Map<String, Node> hostnameToNodeMap =
Collections.synchronizedMap(new TreeMap<String, Node>());
- // A map from JobID to the last known task-completion-event-index on restart
- Map<JobID, Integer> lastSeenEventMapOnRestart =
- new HashMap<JobID, Integer>();
-
// Number of resolved entries
int numResolved;
@@ -2375,7 +2373,7 @@
// check if the restart info is req
if (addRestartInfo) {
- response.setLastKnownIndices(lastSeenEventMapOnRestart);
+ response.setRecoveredJobs(recoveryManager.getJobsToRecover());
}
// Update the trackerToHeartbeatResponseMap
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=750783&r1=750782&r2=750783&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 6 06:42:30 2009
@@ -725,17 +725,14 @@
}
/**
- * Check if the number of events that are obtained are more than required.
- * If yes then purge the extra ones.
+ * Reset the events obtained so far.
*/
- public void purgeMapEvents(int lastKnownIndex) {
+ public void reset() {
// Note that the sync is first on fromEventId and then on allMapEvents
synchronized (fromEventId) {
synchronized (allMapEvents) {
- if (allMapEvents.size() > lastKnownIndex) {
- fromEventId.set(lastKnownIndex);
- allMapEvents = allMapEvents.subList(0, lastKnownIndex);
- }
+ fromEventId.set(0); // set the new index for TCE
+ allMapEvents.clear();
}
}
}
@@ -1094,19 +1091,19 @@
// Check if the map-event list needs purging
- if (heartbeatResponse.getLastKnownIndex() != null) {
+ Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
+ if (jobs.size() > 0) {
synchronized (this) {
// purge the local map events list
- for (Map.Entry<JobID, Integer> entry
- : heartbeatResponse.getLastKnownIndex().entrySet()) {
+ for (JobID job : jobs) {
RunningJob rjob;
synchronized (runningJobs) {
- rjob = runningJobs.get(entry.getKey());
+ rjob = runningJobs.get(job);
if (rjob != null) {
synchronized (rjob) {
FetchStatus f = rjob.getFetchStatus();
if (f != null) {
- f.purgeMapEvents(entry.getValue());
+ f.reset();
}
}
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=750783&r1=750782&r2=750783&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Fri Mar 6 06:42:30 2009
@@ -25,6 +25,8 @@
import junit.framework.TestCase;
import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
/**
* TestJobTrackerRestart checks if the jobtracker can restart. JobTracker
@@ -255,18 +257,6 @@
// Test if all the events that were recovered match exactly
testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
- TaskCompletionEvent[] trackerEvents;
- while(true) {
- trackerEvents =
- mr.getMapTaskCompletionEventsUpdates(0, id, 2 * numMaps)
- .getMapTaskCompletionEvents();
- if (trackerEvents.length < jtEvents.length) {
- UtilsForTests.waitFor(100);
- } else {
- break;
- }
- }
-
// Check the task reports
// The reports should match exactly if the attempts are same
TaskReport[] afterMapReports = jobClient.getMapTaskReports(id);
@@ -278,13 +268,36 @@
assertEquals("Job priority change is not reflected",
JobPriority.HIGH, mr.getJobPriority(id));
+ List<TaskCompletionEvent> jtMapEvents =
+ new ArrayList<TaskCompletionEvent>();
+ for (TaskCompletionEvent tce : jtEvents) {
+ if (tce.isMapTask()) {
+ jtMapEvents.add(tce);
+ }
+ }
+
+ TaskCompletionEvent[] trackerEvents;
+ while(true) {
+ // wait for the tracker to pull all the map events
+ trackerEvents =
+ mr.getMapTaskCompletionEventsUpdates(0, id, jtMapEvents.size())
+ .getMapTaskCompletionEvents();
+ if (trackerEvents.length < jtMapEvents.size()) {
+ UtilsForTests.waitFor(1000);
+ } else {
+ break;
+ }
+ }
+
+ testTaskCompletionEvents(jtMapEvents.toArray(new TaskCompletionEvent[0]),
+ trackerEvents, true, -1);
+
// Signal the reduce tasks
UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir),
getReduceSignalFile(shareDir));
UtilsForTests.waitTillDone(jobClient);
- testTaskCompletionEvents(jtEvents, trackerEvents, true, 2 * numMaps);
// validate the history file
TestJobHistory.validateJobHistoryFileFormat(id, newConf, "SUCCESS", true);