You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/01 23:26:33 UTC
svn commit: r481429 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
src/java/org/apache/hadoop/mapred/TaskRunner.java
Author: cutting
Date: Fri Dec 1 14:26:30 2006
New Revision: 481429
URL: http://svn.apache.org/viewvc?view=rev&rev=481429
Log:
HADOOP-750. Fix a potential race condition during the mapreduce shuffle. Contributed by Owen.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=481429&r1=481428&r2=481429
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Dec 1 14:26:30 2006
@@ -143,6 +143,9 @@
42. HADOOP-430. Stop datanode's HTTP server when registration with
namenode fails. (Wendy Chien via cutting)
+43. HADOOP-750. Fix a potential race condition during mapreduce
+ shuffle. (omalley via cutting)
+
Release 0.8.0 - 2006-11-03
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=481429&r1=481428&r2=481429
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Dec 1 14:26:30 2006
@@ -121,24 +121,14 @@
}
private class PingTimer implements Progressable {
- private long pingTime;
-
- public synchronized void reset() {
- pingTime = 0;
- }
-
- public synchronized long getLastPing() {
- return pingTime;
- }
+ Task task = getTask();
+ TaskTracker tracker = getTracker();
public void progress() {
- synchronized (this) {
- pingTime = System.currentTimeMillis();
- getTask().reportProgress(getTracker());
- }
+ task.reportProgress(tracker);
}
}
-
+
private static int nextMapOutputCopierId = 0;
/** Copies map outputs as they become available */
@@ -149,14 +139,8 @@
private int id = nextMapOutputCopierId++;
public MapOutputCopier() {
- }
-
- /**
- * Get the last time that this copier made progress.
- * @return the System.currentTimeMillis when this copier last made progress
- */
- public long getLastProgressTime() {
- return pingTimer.getLastPing();
+ setName("MapOutputCopier " + reduceTask.getTaskId() + "." + id);
+ LOG.debug(getName() + " created");
}
/**
@@ -185,6 +169,7 @@
private synchronized void finish(long size) {
if (currentLocation != null) {
+ LOG.debug(getName() + " finishing " + currentLocation + " = " + size);
synchronized (copyResults) {
copyResults.add(new CopyResult(currentLocation, size));
copyResults.notify();
@@ -211,15 +196,14 @@
try {
start(loc);
- pingTimer.progress();
size = copyOutput(loc, pingTimer);
- pingTimer.reset();
} catch (IOException e) {
LOG.warn(reduceTask.getTaskId() + " copy failed: " +
loc.getMapTaskId() + " from " + loc.getHost());
LOG.warn(StringUtils.stringifyException(e));
+ } finally {
+ finish(size);
}
- finish(size);
} catch (InterruptedException e) {
return; // ALL DONE
} catch (Throwable th) {
@@ -268,49 +252,6 @@
}
}
-
- private class MapCopyLeaseChecker extends Thread {
- private static final long STALLED_COPY_CHECK = 60 * 1000;
- private long lastStalledCheck = 0;
-
- public void run() {
- while (true) {
- try {
- long currentTime = System.currentTimeMillis();
- if (currentTime - lastStalledCheck > STALLED_COPY_CHECK) {
- lastStalledCheck = currentTime;
- synchronized (copiers) {
- for(int i=0; i < copiers.length; ++i) {
- if (copiers[i] == null) {
- break;
- }
- long lastProgress = copiers[i].getLastProgressTime();
- if (lastProgress != 0 &&
- currentTime - lastProgress > STALLED_COPY_TIMEOUT) {
- LOG.warn("Map output copy stalled on " +
- copiers[i].getLocation());
- // mark the current file as failed
- copiers[i].fail();
- // tell the thread to stop
- copiers[i].interrupt();
- // create a replacement thread
- copiers[i] = new MapOutputCopier();
- copiers[i].start();
- }
- }
- }
- } else {
- Thread.sleep(lastStalledCheck + STALLED_COPY_CHECK - currentTime);
- }
- } catch (InterruptedException ie) {
- return;
- } catch (Throwable th) {
- LOG.error("MapCopyLeaseChecker error: " +
- StringUtils.stringifyException(th));
- }
- }
- }
- }
public ReduceTaskRunner(Task task, TaskTracker tracker,
JobConf conf) throws IOException {
@@ -352,7 +293,6 @@
DecimalFormat mbpsFormat = new DecimalFormat("0.00");
Random backoff = new Random();
final Progress copyPhase = getTask().getProgress().phase();
- MapCopyLeaseChecker leaseChecker = null;
for (int i = 0; i < numOutputs; i++) {
neededOutputs.add(new Integer(i));
@@ -367,8 +307,6 @@
copiers[i] = new MapOutputCopier();
copiers[i].start();
}
- leaseChecker = new MapCopyLeaseChecker();
- leaseChecker.start();
// start the clock for bandwidth measurement
long startTime = System.currentTimeMillis();
@@ -450,6 +388,7 @@
} catch (InterruptedException e) { } // IGNORE
while (!killed && numInFlight > 0) {
+ LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight);
CopyResult cr = getCopyResult();
if (cr != null) {
@@ -506,7 +445,6 @@
}
// all done, inform the copiers to exit
- leaseChecker.interrupt();
synchronized (copiers) {
synchronized (scheduledCopies) {
for (int i=0; i < copiers.length; i++) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=481429&r1=481428&r2=481429
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Dec 1 14:26:30 2006
@@ -33,7 +33,7 @@
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");
- boolean killed = false;
+ volatile boolean killed = false;
private Process process;
private Task t;
private TaskTracker tracker;