You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 06:38:12 UTC

svn commit: r1077819 - /hadoop/common/branches/branch-0.20-security-202/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Author: omalley
Date: Fri Mar  4 05:38:11 2011
New Revision: 1077819

URL: http://svn.apache.org/viewvc?rev=1077819&view=rev
Log:
commit 81004aff85bd49975902252cd4a67b3c573c5dec
Author: Arun C Murthy <ac...@apache.org>
Date:   Tue Mar 1 16:46:16 2011 -0800

    Tuning out-of-band heartbeats, introduced a new config: mapreduce.tasktracker.outofband.heartbeat.damper.

Modified:
    hadoop/common/branches/branch-0.20-security-202/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-0.20-security-202/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-202/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077819&r1=1077818&r2=1077819&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-202/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-202/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 05:38:11 2011
@@ -45,6 +45,7 @@ import java.util.Vector;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import javax.crypto.SecretKey;
@@ -272,9 +273,13 @@ public class TaskTracker implements MRCo
   static final String TT_OUTOFBAND_HEARBEAT =
     "mapreduce.tasktracker.outofband.heartbeat";
   private volatile boolean oobHeartbeatOnTaskCompletion;
+  static final String TT_OUTOFBAND_HEARTBEAT_DAMPER = 
+    "mapreduce.tasktracker.outofband.heartbeat.damper";
+  static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000;
+  private volatile int oobHeartbeatDamper;
   
   // Track number of completed tasks to send an out-of-band heartbeat
-  private IntWritable finishedCount = new IntWritable(0);
+  private AtomicInteger finishedCount = new AtomicInteger(0);
   
   private MapEventsFetcherThread mapEventsFetcher;
   final int workerThreads;
@@ -298,6 +303,7 @@ public class TaskTracker implements MRCo
    * the minimum interval between jobtracker polls
    */
   private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
+  
   /**
    * Number of maptask completion events locations to poll for at one time
    */  
@@ -711,6 +717,10 @@ public class TaskTracker implements MRCo
     
     oobHeartbeatOnTaskCompletion = 
       fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
+    
+    oobHeartbeatDamper = 
+      fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER, 
+          DEFAULT_OOB_HEARTBEAT_DAMPER);
   }
 
   private void createInstrumentation() {
@@ -1356,25 +1366,39 @@ public class TaskTracker implements MRCo
     return recentMapEvents;
   }
 
+  private long getHeartbeatInterval(int numFinishedTasks) {
+    return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
+  }
+  
   /**
    * Main service loop.  Will stay in this loop forever.
    */
   State offerService() throws Exception {
-    long lastHeartbeat = 0;
+    long lastHeartbeat = System.currentTimeMillis();
 
     while (running && !shuttingDown) {
       try {
         long now = System.currentTimeMillis();
-
-        long waitTime = heartbeatInterval - (now - lastHeartbeat);
-        if (waitTime > 0) {
+        
+        // accelerate to account for multiple finished tasks up-front
+        long remaining = 
+          (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+        while (remaining > 0) {
           // sleeps for the wait time or 
-          // until there are empty slots to schedule tasks
+          // until there are *enough* empty slots to schedule tasks
           synchronized (finishedCount) {
-            if (finishedCount.get() == 0) {
-              finishedCount.wait(waitTime);
+            finishedCount.wait(remaining);
+            
+            // Recompute
+            now = System.currentTimeMillis();
+            remaining = 
+              (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+            
+            if (remaining <= 0) {
+              // Reset count 
+              finishedCount.set(0);
+              break;
             }
-            finishedCount.set(0);
           }
         }
 
@@ -2142,8 +2166,7 @@ public class TaskTracker implements MRCo
   private void notifyTTAboutTaskCompletion() {
     if (oobHeartbeatOnTaskCompletion) {
       synchronized (finishedCount) {
-        int value = finishedCount.get();
-        finishedCount.set(value+1);
+        finishedCount.incrementAndGet();
         finishedCount.notify();
       }
     }