You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/12/22 05:23:57 UTC

svn commit: r728602 - in /hadoop/core/branches/branch-0.20: CHANGES.txt src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java src/mapred/org/apache/hadoop/mapred/JobTracker.java src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Author: ddas
Date: Sun Dec 21 20:23:56 2008
New Revision: 728602

URL: http://svn.apache.org/viewvc?rev=728602&view=rev
Log:
Merge -r 728600:728601 from trunk onto 0.20 branch. Fixes HADOOP-4869.

Modified:
    hadoop/core/branches/branch-0.20/CHANGES.txt
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=728602&r1=728601&r2=728602&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Sun Dec 21 20:23:56 2008
@@ -476,6 +476,10 @@
 
     HADOOP-4889. Fix permissions in RPM packaging. (Eric Yang via cdouglas)
 
+    HADOOP-4869. Fixes the TT-JT heartbeat to have an explicit flag for 
+    restart apart from the initialContact flag that there was earlier.
+    (Amareshwari Sriramadasu via ddas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=728602&r1=728601&r2=728602&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Sun Dec 21 20:23:56 2008
@@ -30,7 +30,7 @@
   /**
    * version 3 introduced to replace 
    * emitHearbeat/pollForNewTask/pollForTaskWithClosedJob with
-   * {@link #heartbeat(TaskTrackerStatus, boolean, boolean, short)}
+   * {@link #heartbeat(TaskTrackerStatus, boolean, boolean, boolean, short)}
    * version 4 changed TaskReport for HADOOP-549.
    * version 5 introduced that removes locateMapOutputs and instead uses
    * getTaskCompletionEvents to figure finished maps and fetch the outputs
@@ -57,8 +57,10 @@
    *             (HADOOP-4035)
    * Version 22: Replaced parameter 'initialContact' with 'restarted' 
    *             in heartbeat method (HADOOP-4305) 
+   * Version 23: Added parameter 'initialContact' again in heartbeat method
+   *            (HADOOP-4869) 
    */
-  public static final long versionID = 22L;
+  public static final long versionID = 23L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;
@@ -77,6 +79,8 @@
    * @param status the status update
    * @param restarted <code>true</code> if the process has just started or 
    *                   restarted, <code>false</code> otherwise
+   * @param initialContact <code>true</code> if this is first interaction since
+   *                       'refresh', <code>false</code> otherwise.
    * @param acceptNewTasks <code>true</code> if the {@link TaskTracker} is
    *                       ready to accept new tasks to run.                 
    * @param responseId the last responseId successfully acted upon by the
@@ -86,6 +90,7 @@
    */
   HeartbeatResponse heartbeat(TaskTrackerStatus status, 
                               boolean restarted, 
+                              boolean initialContact,
                               boolean acceptNewTasks,
                               short responseId)
     throws IOException;

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=728602&r1=728601&r2=728602&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Sun Dec 21 20:23:56 2008
@@ -2197,11 +2197,13 @@
    */
   public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 
                                                   boolean restarted,
+                                                  boolean initialContact,
                                                   boolean acceptNewTasks, 
                                                   short responseId) 
     throws IOException {
     LOG.debug("Got heartbeat from: " + status.getTrackerName() + 
               " (restarted: " + restarted + 
+              " initialContact: " + initialContact + 
               " acceptNewTasks: " + acceptNewTasks + ")" +
               " with responseId: " + responseId);
 
@@ -2225,7 +2227,7 @@
       trackerToHeartbeatResponseMap.get(trackerName);
     boolean addRestartInfo = false;
 
-    if (restarted != true) {
+    if (initialContact != true) {
       // If this isn't the 'initial contact' from the tasktracker,
       // there is something seriously wrong if the JobTracker has
       // no record of the 'previous heartbeat'; if so, ask the 
@@ -2263,7 +2265,10 @@
     // Process this heartbeat 
     short newResponseId = (short)(responseId + 1);
     status.setLastSeen(now);
-    if (!processHeartbeat(status, restarted)) {
+    if (!processHeartbeat(status, initialContact)) {
+      if (prevHeartbeatResponse != null) {
+        trackerToHeartbeatResponseMap.remove(trackerName);
+      }
       return new HeartbeatResponse(newResponseId, 
                    new TaskTrackerAction[] {new ReinitTrackerAction()});
     }
@@ -2426,14 +2431,14 @@
    */
   private synchronized boolean processHeartbeat(
                                  TaskTrackerStatus trackerStatus, 
-                                 boolean restarted) {
+                                 boolean initialContact) {
     String trackerName = trackerStatus.getTrackerName();
 
     synchronized (taskTrackers) {
       synchronized (trackerExpiryQueue) {
         boolean seenBefore = updateTaskTrackerStatus(trackerName,
                                                      trackerStatus);
-        if (restarted) {
+        if (initialContact) {
           // If it's first contact, then clear out 
           // any state hanging around
           if (seenBefore) {
@@ -2443,17 +2448,17 @@
           // If not first contact, there should be some record of the tracker
           if (!seenBefore) {
             LOG.warn("Status from unknown Tracker : " + trackerName);
-            // This is lost tracker that came back now, if it blacklisted
-            // increment the count of blacklisted trackers in the cluster
-            if (isBlacklisted(trackerName)) {
-              faultyTrackers.numBlacklistedTrackers += 1;
-            }
-            addNewTracker(trackerStatus);
+            updateTaskTrackerStatus(trackerName, null);
             return false;
           }
         }
 
-        if (restarted) {
+        if (initialContact) {
+          // if this is lost tracker that came back now, and if it blacklisted
+          // increment the count of blacklisted trackers in the cluster
+          if (isBlacklisted(trackerName)) {
+            faultyTrackers.numBlacklistedTrackers += 1;
+          }
           addNewTracker(trackerStatus);
         }
       }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=728602&r1=728601&r2=728602&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Sun Dec 21 20:23:56 2008
@@ -162,6 +162,7 @@
   volatile int mapTotal = 0;
   volatile int reduceTotal = 0;
   boolean justStarted = true;
+  boolean justInited = true;
   // Mark reduce tasks that are shuffling to rollback their events index
   Set<TaskAttemptID> shouldReset = new HashSet<TaskAttemptID>();
     
@@ -524,6 +525,7 @@
       RPC.waitForProxy(InterTrackerProtocol.class,
                        InterTrackerProtocol.versionID, 
                        jobTrackAddr, this.fConf);
+    this.justInited = true;
     this.running = true;    
     // start the thread that will fetch map task completion events
     this.mapEventsFetcher = new MapEventsFetcherThread();
@@ -1020,7 +1022,7 @@
         // If the TaskTracker is just starting up:
         // 1. Verify the buildVersion
         // 2. Get the system directory & filesystem
-        if(justStarted){
+        if(justInited) {
           String jobTrackerBV = jobClient.getBuildVersion();
           if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
             String msg = "Shutting down. Incompatible buildVersion." +
@@ -1096,6 +1098,7 @@
         // resetting heartbeat interval from the response.
         heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
         justStarted = false;
+        justInited = false;
         if (actions != null){ 
           for(TaskTrackerAction action: actions) {
             if (action instanceof LaunchTaskAction) {
@@ -1217,7 +1220,9 @@
     // Xmit the heartbeat
     //
     HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, 
-                                                              justStarted, askForNewTask, 
+                                                              justStarted,
+                                                              justInited,
+                                                              askForNewTask, 
                                                               heartbeatResponseId);
       
     //