You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/25 00:36:55 UTC
svn commit: r409261 - in /lucene/hadoop/trunk: ./ conf/
src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/
src/webapps/job/
Author: cutting
Date: Wed May 24 15:36:55 2006
New Revision: 409261
URL: http://svn.apache.org/viewvc?rev=409261&view=rev
Log:
HADOOP-195. Improve performance of transfer of map outputs to reduce nodes by performing multiple transfers in parallel, each using a separate socket.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed May 24 15:36:55 2006
@@ -58,6 +58,10 @@
15. HADOOP-247. Fix sort progress to better handle exceptions.
(Mahadev Konar via cutting)
+16. HADOOP-195. Improve performance of the transfer of map outputs to
+ reduce nodes by performing multiple transfers in parallel, each on
+ a separate socket. (Sameer Paranjpye via cutting)
+
Release 0.2.1 - 2006-05-12
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed May 24 15:36:55 2006
@@ -230,6 +230,14 @@
</property>
<property>
+ <name>mapred.reduce.parallel.copies</name>
+ <value>5</value>
+ <description>The default number of parallel transfers run by reduce
+ during the copy(shuffle) phase.
+ </description>
+</property>
+
+<property>
<name>mapred.task.timeout</name>
<value>600000</value>
<description>The number of milliseconds before a task will be
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Wed May 24 15:36:55 2006
@@ -23,6 +23,7 @@
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
+import java.net.Socket;
import java.util.logging.*;
import java.io.*;
@@ -184,6 +185,64 @@
values[i] = ((ObjectWritable)wrappedValues[i]).get();
return values;
+ }
+
+
+ /** Expert: Make an RPC call over the specified socket. Assumes that no other calls
+ * are in flight on this connection. */
+ public static Object callRaw(Method method, Object[] params,
+ Socket sock, Configuration conf)
+ throws IOException {
+
+ Invocation inv = new Invocation(method, params);
+ DataInputStream in =
+ new DataInputStream(new BufferedInputStream(sock.getInputStream()));
+ DataOutputStream out =
+ new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
+ String name = new String("Client connection to " +
+ sock.getInetAddress().getHostName() +
+ ":" + sock.getPort());
+
+ try {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine(name + " sending #0");
+ }
+
+ // write out method invocation
+ out.writeInt(0);
+ inv.write(out);
+ out.flush();
+
+ // read return value
+ int callId = in.readInt();
+
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine(name + " got response to call #" + callId);
+ }
+
+ boolean isError = in.readBoolean();
+ if (isError) {
+ throw new RemoteException(WritableUtils.readString(in),
+ WritableUtils.readString(in));
+ }
+ else {
+
+ Writable wrappedValue = (Writable)ObjectWritable.class.newInstance();
+ if (wrappedValue instanceof Configurable) {
+ ((Configurable) wrappedValue).setConf(conf);
+ }
+ wrappedValue.readFields(in);
+
+ return method.getReturnType() != Void.TYPE ?
+ ((ObjectWritable)wrappedValue).get() : null;
+ }
+ }
+ catch (InstantiationException e) {
+ throw new IOException(e.toString());
+ }
+ catch (IllegalAccessException e) {
+ throw new IOException(e.toString());
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed May 24 15:36:55 2006
@@ -101,12 +101,27 @@
public void run() {
LOG.info(getName() + ": starting");
while (running) {
+ Socket acceptedSock = null;
try {
- new Connection(socket.accept()).start(); // start a new connection
+ acceptedSock = socket.accept();
+ new Connection(acceptedSock).start(); // start a new connection
} catch (SocketTimeoutException e) { // ignore timeouts
- } catch (Exception e) { // log all other exceptions
- LOG.log(Level.INFO, getName() + " caught: " + e, e);
+ } catch (OutOfMemoryError e) {
+ // we can run out of memory if we have too many threads
+ // log the event and sleep for a minute and give
+ // some thread(s) a chance to finish
+ LOG.log(Level.WARNING,
+ getName() + " out of memory, sleeping...", e);
+ try {
+ acceptedSock.close();
+ Thread.sleep(60000);
+ } catch (InterruptedException ie) { // ignore interrupts
+ } catch (IOException ioe) { // ignore IOexceptions
+ }
}
+ catch (Exception e) { // log all other exceptions
+ LOG.log(Level.INFO, getName() + " caught: " + e, e);
+ }
}
try {
socket.close();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Wed May 24 15:36:55 2006
@@ -41,6 +41,7 @@
private String reduceTaskId;
private int mapId;
private int partition;
+ private long size;
/** Permits reporting of file copy progress. */
public interface ProgressReporter {
@@ -101,6 +102,10 @@
private FileSystem getLocalFs() throws IOException {
return FileSystem.getNamed("local", this.jobConf);
}
+
+ public long getSize() {
+ return size;
+ }
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, mapTaskId);
@@ -112,7 +117,8 @@
FSDataInputStream in = null;
try {
// write the length-prefixed file content to the wire
- out.writeLong(getLocalFs().getLength(file));
+ this.size = getLocalFs().getLength(file);
+ out.writeLong(this.size);
in = getLocalFs().open(file);
} catch (FileNotFoundException e) {
TaskTracker.LOG.log(Level.SEVERE, "Can't open map output:" + file, e);
@@ -120,7 +126,7 @@
throw e;
}
try {
- byte[] buffer = new byte[8192];
+ byte[] buffer = new byte[65536];
int l = 0;
while (l != -1) {
@@ -149,11 +155,13 @@
// read the length-prefixed file content into a local file
Path file = getInputFile(mapId, reduceTaskId);
long length = in.readLong();
+ this.size = length;
+
float progPerByte = 1.0f / length;
long unread = length;
FSDataOutputStream out = getLocalFs().create(file);
try {
- byte[] buffer = new byte[8192];
+ byte[] buffer = new byte[65536];
while (unread > 0) {
int bytesToRead = (int)Math.min(unread, buffer.length);
in.readFully(buffer, 0, bytesToRead);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Wed May 24 15:36:55 2006
@@ -21,122 +21,423 @@
import java.io.*;
import java.net.*;
import java.util.*;
-import java.util.logging.*;
+import java.lang.reflect.Method;
+import java.text.DecimalFormat;
+
/** Runs a reduce task. */
class ReduceTaskRunner extends TaskRunner {
+
+ /**
+ * for cleaning up old map outputs
+ */
private MapOutputFile mapOutputFile;
+
+ /**
+ * our reduce task instance
+ */
+ private ReduceTask reduceTask;
+
+ /**
+ * the list of map outputs currently being copied
+ */
+ private List scheduledCopies;
+
+ /**
+ * the results of dispatched copy attempts
+ */
+ private List copyResults;
+
+ /**
+ * the number of outputs to copy in parallel
+ */
+ private int numCopiers;
+
+ /**
+ * timeout for copy operations
+ */
+ private int copyTimeout;
+
+ /**
+ * the maximum amount of time (less 1 minute) to wait to
+ * contact a host after a copy from it fails. We wait for (1 min +
+ * Random.nextInt(maxBackoff)) seconds.
+ */
+ private int maxBackoff;
+
+ /**
+ * busy hosts from which copies are being backed off
+ * Map of host -> next contact time
+ */
+ private Map penaltyBox;
+
+ /**
+ * the set of unique hosts from which we are copying
+ */
+ private Set uniqueHosts;
+
+ /**
+ * the last time we polled the job tracker
+ */
+ private long lastPollTime;
+
+ /**
+ * the minimum interval between jobtracker polls
+ */
+ private static final long MIN_POLL_INTERVAL = 5000;
+
+ /**
+ * the number of map output locations to poll for at one time
+ */
+ private static final int PROBE_SAMPLE_SIZE = 50;
+
+ // initialization code to resolve "getFile" to a method object
+ private static Method getFileMethod = null;
+ static {
+ Class[] paramTypes = { String.class, String.class,
+ int.class, int.class };
+ try {
+ getFileMethod =
+ MapOutputProtocol.class.getDeclaredMethod("getFile", paramTypes);
+ }
+ catch (NoSuchMethodException e) {
+ LOG.severe(StringUtils.stringifyException(e));
+ throw new RuntimeException("Can't find \"getFile\" method "
+ + "of MapOutputProtocol", e);
+ }
+ }
+
+ /** Represents the result of an attempt to copy a map output */
+ private class CopyResult {
+
+ // the map output location against which a copy attempt was made
+ private final MapOutputLocation loc;
+
+ // the size of the file copied, -1 if the transfer failed
+ private final long size;
+
+ CopyResult(MapOutputLocation loc, long size) {
+ this.loc = loc;
+ this.size = size;
+ }
+
+ public int getMapId() { return loc.getMapId(); }
+ public boolean getSuccess() { return size >= 0; }
+ public long getSize() { return size; }
+ public String getHost() { return loc.getHost(); }
+ public MapOutputLocation getLocation() { return loc; }
+ }
+
+ /** Copies map outputs as they become available */
+ private class MapOutputCopier extends Thread {
+ public MapOutputCopier() {
+ }
+
+ /** Loop forever and fetch map outputs as they become available.
+ * The thread exits when it is interrupted by the {@link ReduceTaskRunner}
+ */
+ public void run() {
+ try {
+ while (true) {
+ MapOutputLocation loc = null;
+ long size = -1;
+
+ synchronized (scheduledCopies) {
+ while (scheduledCopies.isEmpty()) {
+ scheduledCopies.wait();
+ }
+ loc = (MapOutputLocation)scheduledCopies.remove(0);
+ }
+
+ try {
+ size = copyOutput(loc);
+ } catch (IOException e) {
+ LOG.warning(reduceTask.getTaskId() + " copy failed: " +
+ loc.getMapTaskId() + " from " + loc.getHost());
+ LOG.warning(StringUtils.stringifyException(e));
+ }
+
+ synchronized (copyResults) {
+ copyResults.add(new CopyResult(loc, size));
+ copyResults.notifyAll();
+ }
+ }
+ } catch (InterruptedException e) { } // ALL DONE!
+ }
+
+ /** Copies a a map output from a remote host, using raw RPC.
+ * @param loc the map output location to be copied
+ * @return the size of the copied file
+ * @throws IOException if there is an error copying the file
+ */
+ private long copyOutput(MapOutputLocation loc)
+ throws IOException {
+
+ Object[] params = new Object[4];
+ params[0] = loc.getMapTaskId();
+ params[1] = reduceTask.getTaskId();
+ params[2] = new Integer(loc.getMapId());
+ params[3] = new Integer(reduceTask.getPartition());
+
+ LOG.info(reduceTask.getTaskId() + " copy started: " +
+ loc.getMapTaskId() + " from " + loc.getHost());
+
+ Socket sock = new Socket(loc.getHost(), loc.getPort());
+ try {
+ sock.setSoTimeout(copyTimeout);
+
+ // this copies the map output file
+ MapOutputFile file =
+ (MapOutputFile)RPC.callRaw(getFileMethod, params, sock, conf);
+
+ LOG.info(reduceTask.getTaskId() + " copy finished: " +
+ loc.getMapTaskId() + " from " + loc.getHost());
+
+ return file.getSize();
+ }
+ finally {
+ try {
+ sock.close();
+ } catch (IOException e) { } // IGNORE
+ }
+ }
+
+ }
+
public ReduceTaskRunner(Task task, TaskTracker tracker, JobConf conf) {
super(task, tracker, conf);
this.mapOutputFile = new MapOutputFile();
this.mapOutputFile.setConf(conf);
+
+ this.reduceTask = (ReduceTask)getTask();
+ this.scheduledCopies = new ArrayList(100);
+ this.copyResults = new ArrayList(100);
+ this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
+ this.copyTimeout = conf.getInt("ipc.client.timeout", 10000);
+ this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
+
+ // hosts -> next contact time
+ this.penaltyBox = new Hashtable();
+
+ // hostnames
+ this.uniqueHosts = new HashSet();
+
+ this.lastPollTime = 0;
}
- /** Assemble all of the map output files. */
+ /** Assemble all of the map output files */
public boolean prepare() throws IOException {
- ReduceTask task = ((ReduceTask)getTask());
- this.mapOutputFile.removeAll(task.getTaskId()); // cleanup from failures
- int numMaps = task.getNumMaps();
+
+ // cleanup from failures
+ this.mapOutputFile.removeAll(reduceTask.getTaskId());
+
+ final int numOutputs = reduceTask.getNumMaps();
+ List neededOutputs = new ArrayList(numOutputs);
+ List knownOutputs = new ArrayList(100);
+ int numInFlight = 0, numCopied = 0;
+ int lowThreshold = numCopiers*2;
+ long bytesTransferred = 0;
+ DecimalFormat mbpsFormat = new DecimalFormat("0.00");
+ Random backoff = new Random();
final Progress copyPhase = getTask().getProgress().phase();
-
- // we need input from every map task
- List needed = new ArrayList(numMaps);
- for (int i = 0; i < numMaps; i++) {
- needed.add(new Integer(i));
- copyPhase.addPhase(); // add sub-phase per file
+
+ for (int i = 0; i < numOutputs; i++) {
+ neededOutputs.add(new Integer(i));
+ copyPhase.addPhase(); // add sub-phase per file
}
InterTrackerProtocol jobClient = getTracker().getJobClient();
- while (needed.size() > 0) {
- LOG.info(task.getTaskId()+" Need "+needed.size()+" map output(s).");
- getTask().reportProgress(getTracker());
-
- // query for a just a random subset of needed segments so that we don't
- // overwhelm jobtracker. ideally perhaps we could send a more compact
- // representation of all needed, i.e., a bit-vector
- int checkSize = Math.min(10, needed.size());
- int[] neededIds = new int[checkSize];
- Collections.shuffle(needed);
- ListIterator itr = needed.listIterator();
- for (int i = 0; i < checkSize; i++) {
- neededIds[i] = ((Integer) itr.next()).intValue();
- }
- MapOutputLocation[] locs = null;
- try {
- locs = jobClient.locateMapOutputs(task.getJobId().toString(),
- neededIds, task.getPartition());
- } catch (IOException ie) {
- LOG.info("Problem locating map outputs: " +
- StringUtils.stringifyException(ie));
- }
- if (locs == null || locs.length == 0) {
- try {
- if (killed) {
- return false;
- }
- LOG.info(task.getTaskId()+" No map outputs available; sleeping...");
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- }
- continue;
- }
-
- LOG.info(task.getTaskId()+" Got "+locs.length+" map output locations.");
-
- // try each of these locations
- for (int i = 0; i < locs.length; i++) {
- MapOutputLocation loc = locs[i];
- InetSocketAddress addr =
- new InetSocketAddress(loc.getHost(), loc.getPort());
- MapOutputProtocol client =
- (MapOutputProtocol)RPC.getProxy(MapOutputProtocol.class, addr, this.conf);
-
- this.mapOutputFile.setProgressReporter(new MapOutputFile.ProgressReporter() {
- public void progress(float progress) {
- copyPhase.phase().set(progress);
- try {
- getTask().reportProgress(getTracker());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- });
+ MapOutputCopier[] copiers = new MapOutputCopier[numCopiers];
- getTask().reportProgress(getTracker());
+ // start all the copying threads
+ for (int i=0; i < copiers.length; i++) {
+ copiers[i] = new MapOutputCopier();
+ copiers[i].start();
+ }
+
+ // start the clock for bandwidth measurement
+ long startTime = System.currentTimeMillis();
+ long currentTime = startTime;
+
+ // loop until we get all required outputs or are killed
+ while (!killed && numCopied < numOutputs) {
+
+ LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
+ " map output(s)");
+
+ if (!neededOutputs.isEmpty()) {
+ LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
+ " map output location(s)");
try {
- copyPhase.phase().setStatus(loc.toString());
+ MapOutputLocation[] locs = queryJobTracker(neededOutputs, jobClient);
- LOG.info(task.getTaskId()+" Copying "+loc.getMapTaskId()
- +" output from "+loc.getHost()+".");
- client.getFile(loc.getMapTaskId(), task.getTaskId(),
- loc.getMapId(),
- task.getPartition());
-
- // Success: remove from 'needed'
- for (Iterator it = needed.iterator(); it.hasNext(); ) {
- int mapId = ((Integer) it.next()).intValue();
- if (mapId == loc.getMapId()) {
- it.remove();
- break;
- }
+ // remove discovered outputs from needed list
+ // and put them on the known list
+ for (int i=0; i < locs.length; i++) {
+ neededOutputs.remove(new Integer(locs[i].getMapId()));
+ knownOutputs.add(locs[i]);
+ }
+ LOG.info(reduceTask.getTaskId() +
+ " Got " + (locs == null ? 0 : locs.length) +
+ " map outputs from jobtracker");
+ }
+ catch (IOException ie) {
+ LOG.warning(reduceTask.getTaskId() +
+ " Problem locating map outputs: " +
+ StringUtils.stringifyException(ie));
+ }
+ }
+
+ // now walk through the cache and schedule what we can
+ int numKnown = knownOutputs.size(), numScheduled = 0;
+ int numSlow = 0, numDups = 0;
+
+ LOG.info(reduceTask.getTaskId() + " Got " + numKnown +
+ " known map output location(s); scheduling...");
+
+ synchronized (scheduledCopies) {
+ ListIterator locIt = knownOutputs.listIterator();
+
+ currentTime = System.currentTimeMillis();
+ while (locIt.hasNext()) {
+
+ MapOutputLocation loc = (MapOutputLocation)locIt.next();
+ Long penaltyEnd = (Long)penaltyBox.get(loc.getHost());
+ boolean penalized = false, duplicate = false;
+
+ if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
+ penalized = true; numSlow++;
}
+ if (uniqueHosts.contains(loc.getHost())) {
+ duplicate = true; numDups++;
+ }
+
+ if (!penalized && !duplicate) {
+ uniqueHosts.add(loc.getHost());
+ scheduledCopies.add(loc);
+ locIt.remove(); // remove from knownOutputs
+ numInFlight++; numScheduled++;
+ }
+ }
+ scheduledCopies.notifyAll();
+ }
+ LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +
+ " of " + numKnown + " known outputs (" + numSlow +
+ " slow hosts and " + numDups + " dup hosts)");
+
+ // if we have no copies in flight and we can't schedule anything
+ // new, just wait for a bit
+ try {
+ if (numInFlight == 0 && numScheduled == 0) {
+ Thread.sleep(5000);
+ }
+ } catch (InterruptedException e) { } // IGNORE
+
+ while (!killed && numInFlight > 0) {
+ CopyResult cr = getCopyResult();
+
+ if (cr.getSuccess()) { // a successful copy
+ numCopied++;
+ bytesTransferred += cr.getSize();
+
+ long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;
+ float mbs = ((float)bytesTransferred)/(1024*1024);
+ float transferRate = mbs/secsSinceStart;
+
copyPhase.startNextPhase();
+ copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + " at " +
+ mbpsFormat.format(transferRate) + " MB/s)");
+ getTask().reportProgress(getTracker());
+ }
+ else {
+ // this copy failed, put it back onto neededOutputs
+ neededOutputs.add(new Integer(cr.getMapId()));
- } catch (IOException e) { // failed: try again later
- LOG.log(Level.WARNING,
- task.getTaskId()+" copy failed: "
- +loc.getMapTaskId()+" from "+addr,
- e);
- } finally {
- this.mapOutputFile.setProgressReporter(null);
+ // wait a random amount of time for next contact
+ currentTime = System.currentTimeMillis();
+ long nextContact = currentTime + 60 * 1000 +
+ backoff.nextInt(maxBackoff*1000);
+ penaltyBox.put(cr.getHost(), new Long(nextContact));
+ LOG.warning(reduceTask.getTaskId() + " adding host " +
+ cr.getHost() + " to penalty box, next contact in " +
+ ((nextContact-currentTime)/1000) + " seconds");
+ }
+ uniqueHosts.remove(cr.getHost());
+ numInFlight--;
+
+ // ensure we have enough to keep us busy
+ if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) {
+ break;
}
}
+
+ }
+
+ // all done, inform the copiers to exit
+ synchronized (scheduledCopies) {
+ for (int i=0; i < copiers.length; i++) {
+ copiers[i].interrupt();
+ }
+ }
+
+ return numCopied == numOutputs && !killed;
+ }
+
+
+ private CopyResult getCopyResult() {
+ CopyResult cr = null;
+
+ synchronized (copyResults) {
+ while (copyResults.isEmpty()) {
+ try {
+ copyResults.wait();
+ } catch (InterruptedException e) { }
+ }
+ cr = (CopyResult)copyResults.remove(0);
+ }
+ return cr;
+ }
+
+ /** Queries the job tracker for a set of outputs ready to be copied
+ * @param neededOutputs the list of currently unknown outputs
+ * @param jobClient the job tracker
+ * @return a set of locations to copy outputs from
+ * @throws IOException
+ */
+ private MapOutputLocation[] queryJobTracker(List neededOutputs,
+ InterTrackerProtocol jobClient)
+ throws IOException {
+
+ // query for a just a random subset of needed segments so that we don't
+ // overwhelm jobtracker. ideally perhaps we could send a more compact
+ // representation of all needed, i.e., a bit-vector
+ int checkSize = Math.min(PROBE_SAMPLE_SIZE, neededOutputs.size());
+ int neededIds[] = new int[checkSize];
+
+ Collections.shuffle(neededOutputs);
+
+ ListIterator itr = neededOutputs.listIterator();
+ for (int i=0; i < checkSize; i++) {
+ neededIds[i] = ((Integer)itr.next()).intValue();
}
- getTask().reportProgress(getTracker());
- return true;
+
+ long currentTime = System.currentTimeMillis();
+ long pollTime = lastPollTime + MIN_POLL_INTERVAL;
+ while (currentTime < pollTime) {
+ try {
+ Thread.sleep(pollTime-currentTime);
+ } catch (InterruptedException ie) { } // IGNORE
+ currentTime = System.currentTimeMillis();
+ }
+ lastPollTime = pollTime;
+
+ return jobClient.locateMapOutputs(reduceTask.getJobId().toString(),
+ neededIds,
+ reduceTask.getPartition());
}
+
/** Delete all of the temporary map output files. */
public void close() throws IOException {
getTask().getProgress().setStatus("closed");
Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Wed May 24 15:36:55 2006
@@ -90,7 +90,10 @@
%>
<html>
+<head>
+<meta http-equiv="refresh" content=60>
<title>Hadoop <%=jobid%> on <%=trackerName%></title>
+</head>
<body>
<h1>Hadoop <%=jobid%> on <a href="/jobtracker.jsp"><%=trackerName%></a></h1>