You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/12/22 05:23:57 UTC
svn commit: r728602 - in /hadoop/core/branches/branch-0.20: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
src/mapred/org/apache/hadoop/mapred/JobTracker.java
src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Author: ddas
Date: Sun Dec 21 20:23:56 2008
New Revision: 728602
URL: http://svn.apache.org/viewvc?rev=728602&view=rev
Log:
Merge -r 728600:728601 from trunk onto 0.20 branch. Fixes HADOOP-4869.
Modified:
hadoop/core/branches/branch-0.20/CHANGES.txt
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=728602&r1=728601&r2=728602&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Sun Dec 21 20:23:56 2008
@@ -476,6 +476,10 @@
HADOOP-4889. Fix permissions in RPM packaging. (Eric Yang via cdouglas)
+ HADOOP-4869. Fixes the TT-JT heartbeat to have an explicit flag for
+ restart apart from the initialContact flag that there was earlier.
+ (Amareshwari Sriramadasu via ddas)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=728602&r1=728601&r2=728602&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Sun Dec 21 20:23:56 2008
@@ -30,7 +30,7 @@
/**
* version 3 introduced to replace
* emitHearbeat/pollForNewTask/pollForTaskWithClosedJob with
- * {@link #heartbeat(TaskTrackerStatus, boolean, boolean, short)}
+ * {@link #heartbeat(TaskTrackerStatus, boolean, boolean, boolean, short)}
* version 4 changed TaskReport for HADOOP-549.
* version 5 introduced that removes locateMapOutputs and instead uses
* getTaskCompletionEvents to figure finished maps and fetch the outputs
@@ -57,8 +57,10 @@
* (HADOOP-4035)
* Version 22: Replaced parameter 'initialContact' with 'restarted'
* in heartbeat method (HADOOP-4305)
+ * Version 23: Added parameter 'initialContact' again in heartbeat method
+ * (HADOOP-4869)
*/
- public static final long versionID = 22L;
+ public static final long versionID = 23L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
@@ -77,6 +79,8 @@
* @param status the status update
* @param restarted <code>true</code> if the process has just started or
* restarted, <code>false</code> otherwise
+ * @param initialContact <code>true</code> if this is first interaction since
+ * 'refresh', <code>false</code> otherwise.
* @param acceptNewTasks <code>true</code> if the {@link TaskTracker} is
* ready to accept new tasks to run.
* @param responseId the last responseId successfully acted upon by the
@@ -86,6 +90,7 @@
*/
HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted,
+ boolean initialContact,
boolean acceptNewTasks,
short responseId)
throws IOException;
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=728602&r1=728601&r2=728602&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Sun Dec 21 20:23:56 2008
@@ -2197,11 +2197,13 @@
*/
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted,
+ boolean initialContact,
boolean acceptNewTasks,
short responseId)
throws IOException {
LOG.debug("Got heartbeat from: " + status.getTrackerName() +
" (restarted: " + restarted +
+ " initialContact: " + initialContact +
" acceptNewTasks: " + acceptNewTasks + ")" +
" with responseId: " + responseId);
@@ -2225,7 +2227,7 @@
trackerToHeartbeatResponseMap.get(trackerName);
boolean addRestartInfo = false;
- if (restarted != true) {
+ if (initialContact != true) {
// If this isn't the 'initial contact' from the tasktracker,
// there is something seriously wrong if the JobTracker has
// no record of the 'previous heartbeat'; if so, ask the
@@ -2263,7 +2265,10 @@
// Process this heartbeat
short newResponseId = (short)(responseId + 1);
status.setLastSeen(now);
- if (!processHeartbeat(status, restarted)) {
+ if (!processHeartbeat(status, initialContact)) {
+ if (prevHeartbeatResponse != null) {
+ trackerToHeartbeatResponseMap.remove(trackerName);
+ }
return new HeartbeatResponse(newResponseId,
new TaskTrackerAction[] {new ReinitTrackerAction()});
}
@@ -2426,14 +2431,14 @@
*/
private synchronized boolean processHeartbeat(
TaskTrackerStatus trackerStatus,
- boolean restarted) {
+ boolean initialContact) {
String trackerName = trackerStatus.getTrackerName();
synchronized (taskTrackers) {
synchronized (trackerExpiryQueue) {
boolean seenBefore = updateTaskTrackerStatus(trackerName,
trackerStatus);
- if (restarted) {
+ if (initialContact) {
// If it's first contact, then clear out
// any state hanging around
if (seenBefore) {
@@ -2443,17 +2448,17 @@
// If not first contact, there should be some record of the tracker
if (!seenBefore) {
LOG.warn("Status from unknown Tracker : " + trackerName);
- // This is lost tracker that came back now, if it blacklisted
- // increment the count of blacklisted trackers in the cluster
- if (isBlacklisted(trackerName)) {
- faultyTrackers.numBlacklistedTrackers += 1;
- }
- addNewTracker(trackerStatus);
+ updateTaskTrackerStatus(trackerName, null);
return false;
}
}
- if (restarted) {
+ if (initialContact) {
+ // if this is lost tracker that came back now, and if it blacklisted
+ // increment the count of blacklisted trackers in the cluster
+ if (isBlacklisted(trackerName)) {
+ faultyTrackers.numBlacklistedTrackers += 1;
+ }
addNewTracker(trackerStatus);
}
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=728602&r1=728601&r2=728602&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Sun Dec 21 20:23:56 2008
@@ -162,6 +162,7 @@
volatile int mapTotal = 0;
volatile int reduceTotal = 0;
boolean justStarted = true;
+ boolean justInited = true;
// Mark reduce tasks that are shuffling to rollback their events index
Set<TaskAttemptID> shouldReset = new HashSet<TaskAttemptID>();
@@ -524,6 +525,7 @@
RPC.waitForProxy(InterTrackerProtocol.class,
InterTrackerProtocol.versionID,
jobTrackAddr, this.fConf);
+ this.justInited = true;
this.running = true;
// start the thread that will fetch map task completion events
this.mapEventsFetcher = new MapEventsFetcherThread();
@@ -1020,7 +1022,7 @@
// If the TaskTracker is just starting up:
// 1. Verify the buildVersion
// 2. Get the system directory & filesystem
- if(justStarted){
+ if(justInited) {
String jobTrackerBV = jobClient.getBuildVersion();
if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
String msg = "Shutting down. Incompatible buildVersion." +
@@ -1096,6 +1098,7 @@
// resetting heartbeat interval from the response.
heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
justStarted = false;
+ justInited = false;
if (actions != null){
for(TaskTrackerAction action: actions) {
if (action instanceof LaunchTaskAction) {
@@ -1217,7 +1220,9 @@
// Xmit the heartbeat
//
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
- justStarted, askForNewTask,
+ justStarted,
+ justInited,
+ askForNewTask,
heartbeatResponseId);
//