You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/01 21:51:51 UTC
svn commit: r410929 - in
/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred:
MapOutputLocation.java ReduceTaskRunner.java
Author: cutting
Date: Thu Jun 1 12:51:50 2006
New Revision: 410929
URL: http://svn.apache.org/viewvc?rev=410929&view=rev
Log:
HADOOP-259. Fix HTTP transfers of map output to timoout. Contributed by Owen.
Modified:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?rev=410929&r1=410928&r2=410929&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Thu Jun 1 12:51:50 2006
@@ -91,15 +91,26 @@
}
/**
+ * An interface for callbacks when an method makes some progress.
+ * @author Owen O'Malley
+ */
+ public static interface Pingable {
+ void ping();
+ }
+
+ /**
* Get the map output into a local file from the remote server.
* We use the file system so that we generate checksum files on the data.
* @param fileSys the filesystem to write the file to
* @param localFilename the filename to write the data into
* @param reduce the reduce id to get for
+ * @param pingee a status object that wants to know when we make progress
* @throws IOException when something goes wrong
*/
public long getFile(FileSystem fileSys,
- Path localFilename, int reduce) throws IOException {
+ Path localFilename,
+ int reduce,
+ Pingable pingee) throws IOException {
URL path = new URL(toString() + "&reduce=" + reduce);
InputStream input = path.openConnection().getInputStream();
OutputStream output = fileSys.create(localFilename);
@@ -110,6 +121,9 @@
while (len > 0) {
totalBytes += len;
output.write(buffer, 0 ,len);
+ if (pingee != null) {
+ pingee.ping();
+ }
len = input.read(buffer);
}
} finally {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=410929&r1=410928&r2=410929&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Thu Jun 1 12:51:50 2006
@@ -78,6 +78,11 @@
* A reference to the local file system for writing the map outputs to.
*/
private FileSystem localFileSys;
+
+ /**
+ * The threads for fetching the files.
+ */
+ private MapOutputCopier[] copiers = null;
/**
* the minimum interval between jobtracker polls
@@ -109,13 +114,76 @@
public String getHost() { return loc.getHost(); }
public MapOutputLocation getLocation() { return loc; }
}
+
+ private static class PingTimer implements MapOutputLocation.Pingable {
+ private long pingTime;
+
+ public synchronized void reset() {
+ pingTime = 0;
+ }
+
+ public synchronized long getLastPing() {
+ return pingTime;
+ }
+
+ public void ping() {
+ synchronized (this) {
+ pingTime = System.currentTimeMillis();
+ }
+ }
+ }
/** Copies map outputs as they become available */
private class MapOutputCopier extends Thread {
+ private PingTimer pingTimer = new PingTimer();
+ private MapOutputLocation currentLocation = null;
+
public MapOutputCopier() {
}
+ /**
+ * Get the last time that this copier made progress.
+ * @return the System.currentTimeMillis when this copier last made progress
+ */
+ public long getLastProgressTime() {
+ return pingTimer.getLastPing();
+ }
+
+ /**
+ * Fail the current file that we are fetching
+ * @return were we currently fetching?
+ */
+ public synchronized boolean fail() {
+ if (currentLocation != null) {
+ finish(-1);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Get the current map output location.
+ */
+ public synchronized MapOutputLocation getLocation() {
+ return currentLocation;
+ }
+
+ private synchronized void start(MapOutputLocation loc) {
+ currentLocation = loc;
+ }
+
+ private synchronized void finish(long size) {
+ if (currentLocation != null) {
+ synchronized (copyResults) {
+ copyResults.add(new CopyResult(currentLocation, size));
+ copyResults.notify();
+ }
+ currentLocation = null;
+ }
+ }
+
/** Loop forever and fetch map outputs as they become available.
* The thread exits when it is interrupted by the {@link ReduceTaskRunner}
*/
@@ -133,27 +201,28 @@
}
try {
- size = copyOutput(loc);
+ start(loc);
+ pingTimer.ping();
+ size = copyOutput(loc, pingTimer);
+ pingTimer.reset();
} catch (IOException e) {
LOG.warning(reduceTask.getTaskId() + " copy failed: " +
loc.getMapTaskId() + " from " + loc.getHost());
LOG.warning(StringUtils.stringifyException(e));
}
-
- synchronized (copyResults) {
- copyResults.add(new CopyResult(loc, size));
- copyResults.notifyAll();
- }
+ finish(size);
}
} catch (InterruptedException e) { } // ALL DONE!
}
/** Copies a a map output from a remote host, using raw RPC.
- * @param loc the map output location to be copied
+ * @param currentLocation the map output location to be copied
+ * @param pingee a status object to ping as we make progress
* @return the size of the copied file
* @throws IOException if there is an error copying the file
*/
- private long copyOutput(MapOutputLocation loc)
+ private long copyOutput(MapOutputLocation loc,
+ MapOutputLocation.Pingable pingee)
throws IOException {
String reduceId = reduceTask.getTaskId();
@@ -165,7 +234,7 @@
Path filename = conf.getLocalPath(reduceId + "/map_" +
loc.getMapId() + ".out");
long bytes = loc.getFile(localFileSys, filename,
- reduceTask.getPartition());
+ reduceTask.getPartition(), pingee);
LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() +
" output from " + loc.getHost() + ".");
@@ -181,6 +250,46 @@
}
+ private class MapCopyLeaseChecker extends Thread {
+ private static final long STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
+ private static final long STALLED_COPY_CHECK = 60 * 1000;
+ private long lastStalledCheck = 0;
+
+ public void run() {
+ try {
+ while (true) {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - lastStalledCheck > STALLED_COPY_CHECK) {
+ lastStalledCheck = currentTime;
+ synchronized (copiers) {
+ for(int i=0; i < copiers.length; ++i) {
+ if (copiers[i] == null) {
+ break;
+ }
+ long lastProgress = copiers[i].getLastProgressTime();
+ if (lastProgress != 0 &&
+ currentTime - lastProgress > STALLED_COPY_TIMEOUT) {
+ LOG.warning("Map output copy stalled on " +
+ copiers[i].getLocation());
+ // mark the current file as failed
+ copiers[i].fail();
+ // tell the thread to stop
+ copiers[i].interrupt();
+ // create a replacement thread
+ copiers[i] = new MapOutputCopier();
+ copiers[i].start();
+ }
+ }
+ }
+ } else {
+ Thread.sleep(lastStalledCheck + STALLED_COPY_CHECK - currentTime);
+ }
+ }
+ } catch (InterruptedException ie) {}
+
+ }
+ }
+
public ReduceTaskRunner(Task task, TaskTracker tracker,
JobConf conf) throws IOException {
super(task, tracker, conf);
@@ -218,6 +327,7 @@
DecimalFormat mbpsFormat = new DecimalFormat("0.00");
Random backoff = new Random();
final Progress copyPhase = getTask().getProgress().phase();
+ MapCopyLeaseChecker leaseChecker = null;
for (int i = 0; i < numOutputs; i++) {
neededOutputs.add(new Integer(i));
@@ -225,13 +335,15 @@
}
InterTrackerProtocol jobClient = getTracker().getJobClient();
- MapOutputCopier[] copiers = new MapOutputCopier[numCopiers];
+ copiers = new MapOutputCopier[numCopiers];
// start all the copying threads
for (int i=0; i < copiers.length; i++) {
copiers[i] = new MapOutputCopier();
copiers[i].start();
}
+ leaseChecker = new MapCopyLeaseChecker();
+ leaseChecker.start();
// start the clock for bandwidth measurement
long startTime = System.currentTimeMillis();
@@ -314,34 +426,36 @@
while (!killed && numInFlight > 0) {
CopyResult cr = getCopyResult();
- if (cr.getSuccess()) { // a successful copy
- numCopied++;
- bytesTransferred += cr.getSize();
+ if (cr != null) {
+ if (cr.getSuccess()) { // a successful copy
+ numCopied++;
+ bytesTransferred += cr.getSize();
- long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;
- float mbs = ((float)bytesTransferred)/(1024*1024);
- float transferRate = mbs/secsSinceStart;
+ long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;
+ float mbs = ((float)bytesTransferred)/(1024*1024);
+ float transferRate = mbs/secsSinceStart;
- copyPhase.startNextPhase();
- copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + " at " +
- mbpsFormat.format(transferRate) + " MB/s)");
- getTask().reportProgress(getTracker());
- }
- else {
- // this copy failed, put it back onto neededOutputs
- neededOutputs.add(new Integer(cr.getMapId()));
+ copyPhase.startNextPhase();
+ copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs +
+ " at " +
+ mbpsFormat.format(transferRate) + " MB/s)");
+ getTask().reportProgress(getTracker());
+ } else {
+ // this copy failed, put it back onto neededOutputs
+ neededOutputs.add(new Integer(cr.getMapId()));
- // wait a random amount of time for next contact
- currentTime = System.currentTimeMillis();
- long nextContact = currentTime + 60 * 1000 +
- backoff.nextInt(maxBackoff*1000);
- penaltyBox.put(cr.getHost(), new Long(nextContact));
- LOG.warning(reduceTask.getTaskId() + " adding host " +
- cr.getHost() + " to penalty box, next contact in " +
- ((nextContact-currentTime)/1000) + " seconds");
+ // wait a random amount of time for next contact
+ currentTime = System.currentTimeMillis();
+ long nextContact = currentTime + 60 * 1000 +
+ backoff.nextInt(maxBackoff*1000);
+ penaltyBox.put(cr.getHost(), new Long(nextContact));
+ LOG.warning(reduceTask.getTaskId() + " adding host " +
+ cr.getHost() + " to penalty box, next contact in " +
+ ((nextContact-currentTime)/1000) + " seconds");
+ }
+ uniqueHosts.remove(cr.getHost());
+ numInFlight--;
}
- uniqueHosts.remove(cr.getHost());
- numInFlight--;
// ensure we have enough to keep us busy
if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) {
@@ -352,9 +466,13 @@
}
// all done, inform the copiers to exit
- synchronized (scheduledCopies) {
- for (int i=0; i < copiers.length; i++) {
- copiers[i].interrupt();
+ leaseChecker.interrupt();
+ synchronized (copiers) {
+ synchronized (scheduledCopies) {
+ for (int i=0; i < copiers.length; i++) {
+ copiers[i].interrupt();
+ copiers[i] = null;
+ }
}
}
@@ -363,17 +481,18 @@
private CopyResult getCopyResult() {
- CopyResult cr = null;
-
- synchronized (copyResults) {
- while (copyResults.isEmpty()) {
+ synchronized (copyResults) {
+ while (!killed && copyResults.isEmpty()) {
try {
copyResults.wait();
} catch (InterruptedException e) { }
}
- cr = (CopyResult)copyResults.remove(0);
+ if (copyResults.isEmpty()) {
+ return null;
+ } else {
+ return (CopyResult) copyResults.remove(0);
+ }
}
- return cr;
}
/** Queries the job tracker for a set of outputs ready to be copied
@@ -419,6 +538,17 @@
public void close() throws IOException {
getTask().getProgress().setStatus("closed");
this.mapOutputFile.removeAll(getTask().getTaskId());
+ }
+
+ /**
+ * Kill the child process, but also kick getCopyResult so that it checks
+ * the kill flag.
+ */
+ public void kill() {
+ synchronized (copyResults) {
+ super.kill();
+ copyResults.notify();
+ }
}
}