You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 06:24:14 UTC
svn commit: r1077806 -
/hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Author: omalley
Date: Fri Mar 4 05:24:13 2011
New Revision: 1077806
URL: http://svn.apache.org/viewvc?rev=1077806&view=rev
Log:
commit d72894e09d7e58bc2cd3766c500a66e7aa0c3d0f
Author: Arun C Murthy <ac...@apache.org>
Date: Tue Mar 1 16:36:55 2011 -0800
Tuning out-of-band heartbeats, introduced a new config: mapreduce.tasktracker.outofband.heartbeat.damper.
Modified:
hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077806&r1=1077805&r2=1077806&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 05:24:13 2011
@@ -45,6 +45,7 @@ import java.util.Vector;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import javax.crypto.SecretKey;
@@ -272,9 +273,13 @@ public class TaskTracker implements MRCo
static final String TT_OUTOFBAND_HEARBEAT =
"mapreduce.tasktracker.outofband.heartbeat";
private volatile boolean oobHeartbeatOnTaskCompletion;
+ static final String TT_OUTOFBAND_HEARTBEAT_DAMPER =
+ "mapreduce.tasktracker.outofband.heartbeat.damper";
+ static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000;
+ private volatile int oobHeartbeatDamper;
// Track number of completed tasks to send an out-of-band heartbeat
- private IntWritable finishedCount = new IntWritable(0);
+ private AtomicInteger finishedCount = new AtomicInteger(0);
private MapEventsFetcherThread mapEventsFetcher;
final int workerThreads;
@@ -298,6 +303,7 @@ public class TaskTracker implements MRCo
* the minimum interval between jobtracker polls
*/
private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
+
/**
* Number of maptask completion events locations to poll for at one time
*/
@@ -735,6 +741,10 @@ public class TaskTracker implements MRCo
oobHeartbeatOnTaskCompletion =
fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
+
+ oobHeartbeatDamper =
+ fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER,
+ DEFAULT_OOB_HEARTBEAT_DAMPER);
}
private void createInstrumentation() {
@@ -1401,25 +1411,39 @@ public class TaskTracker implements MRCo
return recentMapEvents;
}
+ private long getHeartbeatInterval(int numFinishedTasks) {
+ return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
+ }
+
/**
* Main service loop. Will stay in this loop forever.
*/
State offerService() throws Exception {
- long lastHeartbeat = 0;
+ long lastHeartbeat = System.currentTimeMillis();
while (running && !shuttingDown) {
try {
long now = System.currentTimeMillis();
-
- long waitTime = heartbeatInterval - (now - lastHeartbeat);
- if (waitTime > 0) {
+
+ // accelerate to account for multiple finished tasks up-front
+ long remaining =
+ (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+ while (remaining > 0) {
// sleeps for the wait time or
- // until there are empty slots to schedule tasks
+ // until there are *enough* empty slots to schedule tasks
synchronized (finishedCount) {
- if (finishedCount.get() == 0) {
- finishedCount.wait(waitTime);
+ finishedCount.wait(remaining);
+
+ // Recompute
+ now = System.currentTimeMillis();
+ remaining =
+ (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+
+ if (remaining <= 0) {
+ // Reset count
+ finishedCount.set(0);
+ break;
}
- finishedCount.set(0);
}
}
@@ -2187,8 +2211,7 @@ public class TaskTracker implements MRCo
private void notifyTTAboutTaskCompletion() {
if (oobHeartbeatOnTaskCompletion) {
synchronized (finishedCount) {
- int value = finishedCount.get();
- finishedCount.set(value+1);
+ finishedCount.incrementAndGet();
finishedCount.notify();
}
}