You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/25 00:36:55 UTC

svn commit: r409261 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/ src/webapps/job/

Author: cutting
Date: Wed May 24 15:36:55 2006
New Revision: 409261

URL: http://svn.apache.org/viewvc?rev=409261&view=rev
Log:
HADOOP-195.  Improve performance of transfer of map outputs to reduce nodes by performing multiple transfers in parallel, each using a separate socket.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed May 24 15:36:55 2006
@@ -58,6 +58,10 @@
 15. HADOOP-247.  Fix sort progress to better handle exceptions.
     (Mahadev Konar via cutting)
 
+16. HADOOP-195.  Improve performance of the transfer of map outputs to
+    reduce nodes by performing multiple transfers in parallel, each on
+    a separate socket.  (Sameer Paranjpye via cutting)
+
 
 Release 0.2.1 - 2006-05-12
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed May 24 15:36:55 2006
@@ -230,6 +230,14 @@
 </property>
 
 <property>
+  <name>mapred.reduce.parallel.copies</name>
+  <value>5</value>
+  <description>The default number of parallel transfers run by reduce
+  during the copy(shuffle) phase.
+  </description>
+</property>
+
+<property>
   <name>mapred.task.timeout</name>
   <value>600000</value>
   <description>The number of milliseconds before a task will be

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Wed May 24 15:36:55 2006
@@ -23,6 +23,7 @@
 import java.lang.reflect.InvocationTargetException;
 
 import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.util.logging.*;
 import java.io.*;
 
@@ -184,6 +185,64 @@
         values[i] = ((ObjectWritable)wrappedValues[i]).get();
     
     return values;
+  }
+
+  
+  /** Expert: Make an RPC call over the specified socket. Assumes that no other calls 
+   * are in flight on this connection. */ 
+  public static Object callRaw(Method method, Object[] params,
+                               Socket sock, Configuration conf)
+    throws IOException {  
+    
+    Invocation inv = new Invocation(method, params);    
+    DataInputStream in =
+      new DataInputStream(new BufferedInputStream(sock.getInputStream()));              
+    DataOutputStream out = 
+      new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));    
+    String name = new String("Client connection to " +
+                             sock.getInetAddress().getHostName() +
+                             ":" + sock.getPort());
+    
+    try {
+      if (LOG.isLoggable(Level.FINE)) {
+        LOG.fine(name + " sending #0");
+      }
+ 
+      // write out method invocation
+      out.writeInt(0);
+      inv.write(out);
+      out.flush();
+      
+      // read return value
+      int callId = in.readInt();
+      
+      if (LOG.isLoggable(Level.FINE)) {
+        LOG.fine(name + " got response to call #" + callId);
+      }
+      
+      boolean isError = in.readBoolean();
+      if (isError) {
+        throw new RemoteException(WritableUtils.readString(in),
+                                  WritableUtils.readString(in));
+      }
+      else {
+
+        Writable wrappedValue = (Writable)ObjectWritable.class.newInstance();        
+        if (wrappedValue instanceof Configurable) {
+          ((Configurable) wrappedValue).setConf(conf);
+        }
+        wrappedValue.readFields(in);
+
+        return method.getReturnType() != Void.TYPE ?
+          ((ObjectWritable)wrappedValue).get() : null;
+      }
+    }
+    catch (InstantiationException e) {
+      throw new IOException(e.toString());
+    }
+    catch (IllegalAccessException e) {
+      throw new IOException(e.toString());
+    }
   }
   
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed May 24 15:36:55 2006
@@ -101,12 +101,27 @@
     public void run() {
       LOG.info(getName() + ": starting");
       while (running) {
+        Socket acceptedSock = null;
         try {
-          new Connection(socket.accept()).start(); // start a new connection
+          acceptedSock = socket.accept();
+          new Connection(acceptedSock).start(); // start a new connection
         } catch (SocketTimeoutException e) {      // ignore timeouts
-        } catch (Exception e) {                   // log all other exceptions
-          LOG.log(Level.INFO, getName() + " caught: " + e, e);
+        } catch (OutOfMemoryError e) {
+          // we can run out of memory if we have too many threads
+          // log the event and sleep for a minute and give 
+          // some thread(s) a chance to finish
+          LOG.log(Level.WARNING,
+                  getName() + " out of memory, sleeping...", e);          
+          try {
+            acceptedSock.close();
+            Thread.sleep(60000);
+          } catch (InterruptedException ie) { // ignore interrupts
+          } catch (IOException ioe) { // ignore IOexceptions
+          }          
         }
+        catch (Exception e) {           // log all other exceptions
+          LOG.log(Level.INFO, getName() + " caught: " + e, e);
+        }        
       }
       try {
         socket.close();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Wed May 24 15:36:55 2006
@@ -41,6 +41,7 @@
   private String reduceTaskId;
   private int mapId;
   private int partition;
+  private long size;
   
   /** Permits reporting of file copy progress. */
   public interface ProgressReporter {
@@ -101,6 +102,10 @@
   private FileSystem getLocalFs() throws IOException {
     return FileSystem.getNamed("local", this.jobConf);
   }
+  
+  public long getSize() {
+    return size;
+  }
 
   public void write(DataOutput out) throws IOException {
     UTF8.writeString(out, mapTaskId);
@@ -112,7 +117,8 @@
     FSDataInputStream in = null;
     try {
       // write the length-prefixed file content to the wire
-      out.writeLong(getLocalFs().getLength(file));
+      this.size = getLocalFs().getLength(file);
+      out.writeLong(this.size);
       in = getLocalFs().open(file);
     } catch (FileNotFoundException e) {
       TaskTracker.LOG.log(Level.SEVERE, "Can't open map output:" + file, e);
@@ -120,7 +126,7 @@
       throw e;
     }
     try {
-      byte[] buffer = new byte[8192];
+      byte[] buffer = new byte[65536];
       int l  = 0;
       
       while (l != -1) {
@@ -149,11 +155,13 @@
     // read the length-prefixed file content into a local file
     Path file = getInputFile(mapId, reduceTaskId);
     long length = in.readLong();
+    this.size = length;
+    
     float progPerByte = 1.0f / length;
     long unread = length;
     FSDataOutputStream out = getLocalFs().create(file);
     try {
-      byte[] buffer = new byte[8192];
+      byte[] buffer = new byte[65536];
       while (unread > 0) {
           int bytesToRead = (int)Math.min(unread, buffer.length);
           in.readFully(buffer, 0, bytesToRead);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Wed May 24 15:36:55 2006
@@ -21,122 +21,423 @@
 import java.io.*;
 import java.net.*;
 import java.util.*;
-import java.util.logging.*;
+import java.lang.reflect.Method;
+import java.text.DecimalFormat;
+
 
 /** Runs a reduce task. */
 class ReduceTaskRunner extends TaskRunner {
+  
+  /** 
+   * for cleaning up old map outputs
+   */
   private MapOutputFile mapOutputFile;
+  
+  /**
+   * our reduce task instance
+   */
+  private ReduceTask reduceTask;
+    
+  /**
+   * the list of map outputs currently being copied
+   */
+  private List scheduledCopies;
+  
+  /**
+   *  the results of dispatched copy attempts
+   */
+  private List copyResults;
+  
+  /**
+   *  the number of outputs to copy in parallel
+   */
+  private int numCopiers;
+
+  /**
+   * timeout for copy operations
+   */
+  private int copyTimeout;
+  
+  /**
+   * the maximum amount of time (less 1 minute) to wait to 
+   * contact a host after a copy from it fails. We wait for (1 min +
+   * Random.nextInt(maxBackoff)) seconds.
+   */
+  private int maxBackoff;
+  
+  /**
+   * busy hosts from which copies are being backed off
+   * Map of host -> next contact time
+   */
+  private Map penaltyBox;
+
+  /**
+   * the set of unique hosts from which we are copying
+   */
+  private Set uniqueHosts;
+  
+  /**
+   * the last time we polled the job tracker
+   */
+  private long lastPollTime;
+  
+  /**
+   * the minimum interval between jobtracker polls
+   */
+  private static final long MIN_POLL_INTERVAL = 5000;
+  
+  /**
+   * the number of map output locations to poll for at one time
+   */
+  private static final int PROBE_SAMPLE_SIZE = 50;
+
+  // initialization code to resolve "getFile" to a method object
+  private static Method getFileMethod = null;
+  static {
+    Class[] paramTypes = { String.class, String.class,
+                           int.class, int.class };    
+    try {
+      getFileMethod = 
+        MapOutputProtocol.class.getDeclaredMethod("getFile", paramTypes);
+    }
+    catch (NoSuchMethodException e) {
+      LOG.severe(StringUtils.stringifyException(e));
+      throw new RuntimeException("Can't find \"getFile\" method "
+                                 + "of MapOutputProtocol", e);
+    }
+  }
+  
+  /** Represents the result of an attempt to copy a map output */
+  private class CopyResult {
+    
+    // the map output location against which a copy attempt was made
+    private final MapOutputLocation loc;
+    
+    // the size of the file copied, -1 if the transfer failed
+    private final long size;
+        
+    CopyResult(MapOutputLocation loc, long size) {
+      this.loc = loc;
+      this.size = size;
+    }
+    
+    public int getMapId() { return loc.getMapId(); }
+    public boolean getSuccess() { return size >= 0; }
+    public long getSize() { return size; }
+    public String getHost() { return loc.getHost(); }
+    public MapOutputLocation getLocation() { return loc; }
+  }
+  
+  /** Copies map outputs as they become available */
+  private class MapOutputCopier extends Thread {
 
+    public MapOutputCopier() {
+    }
+    
+    /** Loop forever and fetch map outputs as they become available.
+     * The thread exits when it is interrupted by the {@link ReduceTaskRunner}
+     */
+    public void run() {
+      try {
+        while (true) {        
+          MapOutputLocation loc = null;
+          long size = -1;
+          
+          synchronized (scheduledCopies) {
+            while (scheduledCopies.isEmpty()) {
+              scheduledCopies.wait();
+            }
+            loc = (MapOutputLocation)scheduledCopies.remove(0);
+          }
+
+          try {
+            size = copyOutput(loc);
+          } catch (IOException e) {
+            LOG.warning(reduceTask.getTaskId() + " copy failed: " +
+                        loc.getMapTaskId() + " from " + loc.getHost());
+            LOG.warning(StringUtils.stringifyException(e));
+          }
+          
+          synchronized (copyResults) {
+            copyResults.add(new CopyResult(loc, size));
+            copyResults.notifyAll();
+          }
+        }
+      } catch (InterruptedException e) { }  // ALL DONE!
+    }
+
+    /** Copies a a map output from a remote host, using raw RPC. 
+     * @param loc the map output location to be copied
+     * @return the size of the copied file
+     * @throws IOException if there is an error copying the file
+     */
+    private long copyOutput(MapOutputLocation loc)
+    throws IOException {
+
+      Object[] params = new Object[4];
+      params[0] = loc.getMapTaskId();
+      params[1] = reduceTask.getTaskId();
+      params[2] = new Integer(loc.getMapId());
+      params[3] = new Integer(reduceTask.getPartition());
+      
+      LOG.info(reduceTask.getTaskId() + " copy started: " +
+               loc.getMapTaskId() + " from " + loc.getHost());
+
+      Socket sock = new Socket(loc.getHost(), loc.getPort());
+      try {
+        sock.setSoTimeout(copyTimeout);
+
+        // this copies the map output file
+        MapOutputFile file =
+          (MapOutputFile)RPC.callRaw(getFileMethod, params, sock, conf);
+
+        LOG.info(reduceTask.getTaskId() + " copy finished: " +
+                 loc.getMapTaskId() + " from " + loc.getHost());      
+
+        return file.getSize();
+      }
+      finally {
+        try {
+          sock.close();
+        } catch (IOException e) { } // IGNORE
+      }
+    }
+
+  }
+  
   public ReduceTaskRunner(Task task, TaskTracker tracker, JobConf conf) {
     super(task, tracker, conf);
     this.mapOutputFile = new MapOutputFile();
     this.mapOutputFile.setConf(conf);
+
+    this.reduceTask = (ReduceTask)getTask();
+    this.scheduledCopies = new ArrayList(100);
+    this.copyResults = new ArrayList(100);    
+    this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
+    this.copyTimeout = conf.getInt("ipc.client.timeout", 10000);
+    this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
+
+    // hosts -> next contact time
+    this.penaltyBox = new Hashtable();
+    
+    // hostnames
+    this.uniqueHosts = new HashSet();
+    
+    this.lastPollTime = 0;
   }
 
-  /** Assemble all of the map output files. */
+  /** Assemble all of the map output files */
   public boolean prepare() throws IOException {
-    ReduceTask task = ((ReduceTask)getTask());
-    this.mapOutputFile.removeAll(task.getTaskId());    // cleanup from failures
-    int numMaps = task.getNumMaps();
+    
+    // cleanup from failures
+    this.mapOutputFile.removeAll(reduceTask.getTaskId());
+    
+    final int      numOutputs = reduceTask.getNumMaps();
+    List           neededOutputs = new ArrayList(numOutputs);
+    List           knownOutputs = new ArrayList(100);
+    int            numInFlight = 0, numCopied = 0;
+    int            lowThreshold = numCopiers*2;
+    long           bytesTransferred = 0;
+    DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
+    Random         backoff = new Random();
     final Progress copyPhase = getTask().getProgress().phase();
-
-    // we need input from every map task
-    List needed = new ArrayList(numMaps);
-    for (int i = 0; i < numMaps; i++) {
-      needed.add(new Integer(i));
-      copyPhase.addPhase();                       // add sub-phase per file
+    
+    for (int i = 0; i < numOutputs; i++) {
+      neededOutputs.add(new Integer(i));
+      copyPhase.addPhase();       // add sub-phase per file
     }
 
     InterTrackerProtocol jobClient = getTracker().getJobClient();
-    while (needed.size() > 0) {
-      LOG.info(task.getTaskId()+" Need "+needed.size()+" map output(s).");
-      getTask().reportProgress(getTracker());
-
-      // query for a just a random subset of needed segments so that we don't
-      // overwhelm jobtracker.  ideally perhaps we could send a more compact
-      // representation of all needed, i.e., a bit-vector
-      int checkSize = Math.min(10, needed.size());
-      int[] neededIds = new int[checkSize];
-      Collections.shuffle(needed);
-      ListIterator itr = needed.listIterator();
-      for (int i = 0; i < checkSize; i++) {
-        neededIds[i] = ((Integer) itr.next()).intValue();
-      }
-      MapOutputLocation[] locs = null;
-      try {
-        locs = jobClient.locateMapOutputs(task.getJobId().toString(), 
-                                          neededIds, task.getPartition());
-      } catch (IOException ie) {
-        LOG.info("Problem locating map outputs: " + 
-                 StringUtils.stringifyException(ie));
-      }
-      if (locs == null || locs.length == 0) {
-        try {
-          if (killed) {
-            return false;
-          }
-          LOG.info(task.getTaskId()+" No map outputs available; sleeping...");
-          Thread.sleep(10000);
-        } catch (InterruptedException e) {
-        }
-        continue;
-      }
-
-      LOG.info(task.getTaskId()+" Got "+locs.length+" map output locations.");
-
-      // try each of these locations
-      for (int i = 0; i < locs.length; i++) {
-        MapOutputLocation loc = locs[i];
-        InetSocketAddress addr =
-          new InetSocketAddress(loc.getHost(), loc.getPort());
-        MapOutputProtocol client =
-          (MapOutputProtocol)RPC.getProxy(MapOutputProtocol.class, addr, this.conf);
-
-        this.mapOutputFile.setProgressReporter(new MapOutputFile.ProgressReporter() {
-            public void progress(float progress) {
-              copyPhase.phase().set(progress);
-              try {
-                getTask().reportProgress(getTracker());
-              } catch (IOException e) {
-                throw new RuntimeException(e);
-              }
-            }
-          });
+    MapOutputCopier[] copiers = new MapOutputCopier[numCopiers];
 
-        getTask().reportProgress(getTracker());
+    // start all the copying threads
+    for (int i=0; i < copiers.length; i++) {
+      copiers[i] = new MapOutputCopier();
+      copiers[i].start();
+    }
+    
+    // start the clock for bandwidth measurement
+    long startTime = System.currentTimeMillis();
+    long currentTime = startTime;
+    
+    // loop until we get all required outputs or are killed
+    while (!killed && numCopied < numOutputs) {
+
+      LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
+               " map output(s)");
+
+      if (!neededOutputs.isEmpty()) {
+        LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
+                 " map output location(s)");
         try {
-          copyPhase.phase().setStatus(loc.toString());
+          MapOutputLocation[] locs = queryJobTracker(neededOutputs, jobClient);
           
-          LOG.info(task.getTaskId()+" Copying "+loc.getMapTaskId()
-                   +" output from "+loc.getHost()+".");
-          client.getFile(loc.getMapTaskId(), task.getTaskId(),
-                         loc.getMapId(),
-                         task.getPartition());
-
-          // Success: remove from 'needed'
-          for (Iterator it = needed.iterator(); it.hasNext(); ) {
-              int mapId = ((Integer) it.next()).intValue();
-              if (mapId == loc.getMapId()) {
-                it.remove();
-                break;
-              }
+          // remove discovered outputs from needed list
+          // and put them on the known list
+          for (int i=0; i < locs.length; i++) {
+            neededOutputs.remove(new Integer(locs[i].getMapId()));
+            knownOutputs.add(locs[i]);
+          }
+          LOG.info(reduceTask.getTaskId() +
+                   " Got " + (locs == null ? 0 : locs.length) + 
+                   " map outputs from jobtracker");
+        }
+        catch (IOException ie) {
+          LOG.warning(reduceTask.getTaskId() +
+                      " Problem locating map outputs: " +
+                      StringUtils.stringifyException(ie));
+        }
+      }
+      
+      // now walk through the cache and schedule what we can
+      int numKnown = knownOutputs.size(), numScheduled = 0;
+      int numSlow = 0, numDups = 0;
+
+      LOG.info(reduceTask.getTaskId() + " Got " + numKnown + 
+               " known map output location(s); scheduling...");
+
+      synchronized (scheduledCopies) {
+        ListIterator locIt = knownOutputs.listIterator();
+
+        currentTime = System.currentTimeMillis();
+        while (locIt.hasNext()) {
+
+          MapOutputLocation loc = (MapOutputLocation)locIt.next();
+          Long penaltyEnd = (Long)penaltyBox.get(loc.getHost());
+          boolean penalized = false, duplicate = false;
+ 
+          if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
+            penalized = true; numSlow++;
           }
+          if (uniqueHosts.contains(loc.getHost())) {
+            duplicate = true; numDups++;
+          }
+ 
+          if (!penalized && !duplicate) {
+            uniqueHosts.add(loc.getHost());
+            scheduledCopies.add(loc);
+            locIt.remove();  // remove from knownOutputs
+            numInFlight++; numScheduled++;
+          }
+        }
+        scheduledCopies.notifyAll();
+      }
+      LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +
+               " of " + numKnown + " known outputs (" + numSlow +
+               " slow hosts and " + numDups + " dup hosts)");
+
+      // if we have no copies in flight and we can't schedule anything
+      // new, just wait for a bit
+      try {
+        if (numInFlight == 0 && numScheduled == 0) {
+          Thread.sleep(5000);
+        }
+      } catch (InterruptedException e) { } // IGNORE
+
+      while (!killed && numInFlight > 0) {
+        CopyResult cr = getCopyResult();
+        
+        if (cr.getSuccess()) {  // a successful copy
+          numCopied++;
+          bytesTransferred += cr.getSize();
+          
+          long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;
+          float mbs = ((float)bytesTransferred)/(1024*1024);
+          float transferRate = mbs/secsSinceStart;
+          
           copyPhase.startNextPhase();
+          copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + " at " +
+                              mbpsFormat.format(transferRate) +  " MB/s)");          
+          getTask().reportProgress(getTracker());
+        }
+        else {
+          // this copy failed, put it back onto neededOutputs
+          neededOutputs.add(new Integer(cr.getMapId()));
           
-        } catch (IOException e) {                 // failed: try again later
-          LOG.log(Level.WARNING,
-                  task.getTaskId()+" copy failed: "
-                  +loc.getMapTaskId()+" from "+addr,
-                  e);
-        } finally {
-          this.mapOutputFile.setProgressReporter(null);
+          // wait a random amount of time for next contact
+          currentTime = System.currentTimeMillis();
+          long nextContact = currentTime + 60 * 1000 +
+                             backoff.nextInt(maxBackoff*1000);
+          penaltyBox.put(cr.getHost(), new Long(nextContact));          
+          LOG.warning(reduceTask.getTaskId() + " adding host " +
+                      cr.getHost() + " to penalty box, next contact in " +
+                      ((nextContact-currentTime)/1000) + " seconds");
+        }
+        uniqueHosts.remove(cr.getHost());
+        numInFlight--;
+        
+        // ensure we have enough to keep us busy
+        if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) {
+          break;
         }
       }
+      
+    }
+
+    // all done, inform the copiers to exit
+    synchronized (scheduledCopies) {
+     for (int i=0; i < copiers.length; i++) {
+       copiers[i].interrupt();
+      }
+    }
+    
+    return numCopied == numOutputs && !killed;
+  }
+  
+  
+  private CopyResult getCopyResult() {  
+    CopyResult cr = null;
+    
+    synchronized (copyResults) {
+      while (copyResults.isEmpty()) {
+        try {
+          copyResults.wait();
+        } catch (InterruptedException e) { }
+      }
+      cr = (CopyResult)copyResults.remove(0);      
+    }    
+    return cr;
+  }
+  
+  /** Queries the job tracker for a set of outputs ready to be copied
+   * @param neededOutputs the list of currently unknown outputs
+   * @param jobClient the job tracker
+   * @return a set of locations to copy outputs from
+   * @throws IOException
+   */  
+  private MapOutputLocation[] queryJobTracker(List neededOutputs,
+                                              InterTrackerProtocol jobClient)
+  throws IOException {
+    
+    // query for a just a random subset of needed segments so that we don't
+    // overwhelm jobtracker.  ideally perhaps we could send a more compact
+    // representation of all needed, i.e., a bit-vector
+    int     checkSize = Math.min(PROBE_SAMPLE_SIZE, neededOutputs.size());
+    int     neededIds[] = new int[checkSize];
+      
+    Collections.shuffle(neededOutputs);
+      
+    ListIterator itr = neededOutputs.listIterator();
+    for (int i=0; i < checkSize; i++) {
+      neededIds[i] = ((Integer)itr.next()).intValue();
     }
-    getTask().reportProgress(getTracker());
-    return true;
+
+    long currentTime = System.currentTimeMillis();    
+    long pollTime = lastPollTime + MIN_POLL_INTERVAL;
+    while (currentTime < pollTime) {
+      try {
+        Thread.sleep(pollTime-currentTime);
+      } catch (InterruptedException ie) { } // IGNORE
+      currentTime = System.currentTimeMillis();
+    }
+    lastPollTime = pollTime;
+
+    return jobClient.locateMapOutputs(reduceTask.getJobId().toString(), 
+                                      neededIds,
+                                      reduceTask.getPartition());
   }
 
+  
   /** Delete all of the temporary map output files. */
   public void close() throws IOException {
     getTask().getProgress().setStatus("closed");

Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?rev=409261&r1=409260&r2=409261&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Wed May 24 15:36:55 2006
@@ -90,7 +90,10 @@
 %>
 
 <html>
+<head>
+<meta http-equiv="refresh" content=60>
 <title>Hadoop <%=jobid%> on <%=trackerName%></title>
+</head>
 <body>
 <h1>Hadoop <%=jobid%> on <a href="/jobtracker.jsp"><%=trackerName%></a></h1>