You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/01 21:51:51 UTC

svn commit: r410929 - in /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred: MapOutputLocation.java ReduceTaskRunner.java

Author: cutting
Date: Thu Jun  1 12:51:50 2006
New Revision: 410929

URL: http://svn.apache.org/viewvc?rev=410929&view=rev
Log:
HADOOP-259.  Fix HTTP transfers of map output to timoout.  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?rev=410929&r1=410928&r2=410929&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Thu Jun  1 12:51:50 2006
@@ -91,15 +91,26 @@
   }
 
   /**
+   * An interface for callbacks when an method makes some progress.
+   * @author Owen O'Malley
+   */
+  public static interface Pingable {
+    void ping();
+  }
+  
+  /**
    * Get the map output into a local file from the remote server.
    * We use the file system so that we generate checksum files on the data.
    * @param fileSys the filesystem to write the file to
    * @param localFilename the filename to write the data into
    * @param reduce the reduce id to get for
+   * @param pingee a status object that wants to know when we make progress
    * @throws IOException when something goes wrong
    */
   public long getFile(FileSystem fileSys, 
-                      Path localFilename, int reduce) throws IOException {
+                      Path localFilename, 
+                      int reduce,
+                      Pingable pingee) throws IOException {
     URL path = new URL(toString() + "&reduce=" + reduce);
     InputStream input = path.openConnection().getInputStream();
     OutputStream output = fileSys.create(localFilename);
@@ -110,6 +121,9 @@
       while (len > 0) {
         totalBytes += len;
         output.write(buffer, 0 ,len);
+        if (pingee != null) {
+          pingee.ping();
+        }
         len = input.read(buffer);
       }
     } finally {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=410929&r1=410928&r2=410929&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Thu Jun  1 12:51:50 2006
@@ -78,6 +78,11 @@
    * A reference to the local file system for writing the map outputs to.
    */
   private FileSystem localFileSys;
+
+  /**
+   * The threads for fetching the files.
+   */
+  private MapOutputCopier[] copiers = null;
   
   /**
    * the minimum interval between jobtracker polls
@@ -109,13 +114,76 @@
     public String getHost() { return loc.getHost(); }
     public MapOutputLocation getLocation() { return loc; }
   }
+
+  private static class PingTimer implements MapOutputLocation.Pingable {
+    private long pingTime;
+    
+    public synchronized void reset() {
+      pingTime = 0;
+    }
+    
+    public synchronized long getLastPing() {
+      return pingTime;
+    }
+    
+    public void ping() {
+      synchronized (this) {
+        pingTime = System.currentTimeMillis();
+      }
+    }
+  }
   
   /** Copies map outputs as they become available */
   private class MapOutputCopier extends Thread {
 
+    private PingTimer pingTimer = new PingTimer();
+    private MapOutputLocation currentLocation = null;
+    
     public MapOutputCopier() {
     }
     
+    /**
+     * Get the last time that this copier made progress.
+     * @return the System.currentTimeMillis when this copier last made progress
+     */
+    public long getLastProgressTime() {
+      return pingTimer.getLastPing();
+    }
+    
+    /**
+     * Fail the current file that we are fetching
+     * @return were we currently fetching?
+     */
+    public synchronized boolean fail() {
+      if (currentLocation != null) {
+        finish(-1);
+        return true;
+      } else {
+        return false;
+      }
+    }
+    
+    /**
+     * Get the current map output location.
+     */
+    public synchronized MapOutputLocation getLocation() {
+      return currentLocation;
+    }
+    
+    private synchronized void start(MapOutputLocation loc) {
+      currentLocation = loc;
+    }
+    
+    private synchronized void finish(long size) {
+      if (currentLocation != null) {
+        synchronized (copyResults) {
+          copyResults.add(new CopyResult(currentLocation, size));
+          copyResults.notify();
+        }
+        currentLocation = null;
+      }
+    }
+    
     /** Loop forever and fetch map outputs as they become available.
      * The thread exits when it is interrupted by the {@link ReduceTaskRunner}
      */
@@ -133,27 +201,28 @@
           }
 
           try {
-            size = copyOutput(loc);
+            start(loc);
+            pingTimer.ping();
+            size = copyOutput(loc, pingTimer);
+            pingTimer.reset();
           } catch (IOException e) {
             LOG.warning(reduceTask.getTaskId() + " copy failed: " +
                         loc.getMapTaskId() + " from " + loc.getHost());
             LOG.warning(StringUtils.stringifyException(e));
           }
-          
-          synchronized (copyResults) {
-            copyResults.add(new CopyResult(loc, size));
-            copyResults.notifyAll();
-          }
+          finish(size);
         }
       } catch (InterruptedException e) { }  // ALL DONE!
     }
 
     /** Copies a a map output from a remote host, using raw RPC. 
-     * @param loc the map output location to be copied
+     * @param currentLocation the map output location to be copied
+     * @param pingee a status object to ping as we make progress
      * @return the size of the copied file
      * @throws IOException if there is an error copying the file
      */
-    private long copyOutput(MapOutputLocation loc)
+    private long copyOutput(MapOutputLocation loc, 
+                            MapOutputLocation.Pingable pingee)
     throws IOException {
 
       String reduceId = reduceTask.getTaskId();
@@ -165,7 +234,7 @@
         Path filename = conf.getLocalPath(reduceId + "/map_" +
                                           loc.getMapId() + ".out");
         long bytes = loc.getFile(localFileSys, filename,
-                                 reduceTask.getPartition());
+                                 reduceTask.getPartition(), pingee);
 
         LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() +
                  " output from " + loc.getHost() + ".");
@@ -181,6 +250,46 @@
 
   }
   
+  private class MapCopyLeaseChecker extends Thread {
+    private static final long STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
+    private static final long STALLED_COPY_CHECK = 60 * 1000;
+    private long lastStalledCheck = 0;
+    
+    public void run() {
+      try {
+        while (true) {
+          long currentTime = System.currentTimeMillis();
+          if (currentTime - lastStalledCheck > STALLED_COPY_CHECK) {
+            lastStalledCheck = currentTime;
+            synchronized (copiers) {
+              for(int i=0; i < copiers.length; ++i) {
+                if (copiers[i] == null) {
+                  break;
+                }
+                long lastProgress = copiers[i].getLastProgressTime();
+                if (lastProgress != 0 && 
+                    currentTime - lastProgress > STALLED_COPY_TIMEOUT)  {
+                  LOG.warning("Map output copy stalled on " + 
+                              copiers[i].getLocation());
+                  // mark the current file as failed
+                  copiers[i].fail();
+                  // tell the thread to stop
+                  copiers[i].interrupt();
+                  // create a replacement thread
+                  copiers[i] = new MapOutputCopier();
+                  copiers[i].start();
+                }
+              }
+            }
+          } else {
+            Thread.sleep(lastStalledCheck + STALLED_COPY_CHECK - currentTime);
+          }
+        }
+      } catch (InterruptedException ie) {}
+      
+    }
+  }
+
   public ReduceTaskRunner(Task task, TaskTracker tracker, 
                           JobConf conf) throws IOException {
     super(task, tracker, conf);
@@ -218,6 +327,7 @@
     DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
     Random         backoff = new Random();
     final Progress copyPhase = getTask().getProgress().phase();
+    MapCopyLeaseChecker leaseChecker = null;
     
     for (int i = 0; i < numOutputs; i++) {
       neededOutputs.add(new Integer(i));
@@ -225,13 +335,15 @@
     }
 
     InterTrackerProtocol jobClient = getTracker().getJobClient();
-    MapOutputCopier[] copiers = new MapOutputCopier[numCopiers];
+    copiers = new MapOutputCopier[numCopiers];
 
     // start all the copying threads
     for (int i=0; i < copiers.length; i++) {
       copiers[i] = new MapOutputCopier();
       copiers[i].start();
     }
+    leaseChecker = new MapCopyLeaseChecker();
+    leaseChecker.start();
     
     // start the clock for bandwidth measurement
     long startTime = System.currentTimeMillis();
@@ -314,34 +426,36 @@
       while (!killed && numInFlight > 0) {
         CopyResult cr = getCopyResult();
         
-        if (cr.getSuccess()) {  // a successful copy
-          numCopied++;
-          bytesTransferred += cr.getSize();
+        if (cr != null) {
+          if (cr.getSuccess()) {  // a successful copy
+            numCopied++;
+            bytesTransferred += cr.getSize();
           
-          long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;
-          float mbs = ((float)bytesTransferred)/(1024*1024);
-          float transferRate = mbs/secsSinceStart;
+            long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;
+            float mbs = ((float)bytesTransferred)/(1024*1024);
+            float transferRate = mbs/secsSinceStart;
           
-          copyPhase.startNextPhase();
-          copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + " at " +
-                              mbpsFormat.format(transferRate) +  " MB/s)");          
-          getTask().reportProgress(getTracker());
-        }
-        else {
-          // this copy failed, put it back onto neededOutputs
-          neededOutputs.add(new Integer(cr.getMapId()));
+            copyPhase.startNextPhase();
+            copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + 
+                                " at " +
+                                mbpsFormat.format(transferRate) +  " MB/s)");          
+            getTask().reportProgress(getTracker());
+          } else {
+            // this copy failed, put it back onto neededOutputs
+            neededOutputs.add(new Integer(cr.getMapId()));
           
-          // wait a random amount of time for next contact
-          currentTime = System.currentTimeMillis();
-          long nextContact = currentTime + 60 * 1000 +
-                             backoff.nextInt(maxBackoff*1000);
-          penaltyBox.put(cr.getHost(), new Long(nextContact));          
-          LOG.warning(reduceTask.getTaskId() + " adding host " +
-                      cr.getHost() + " to penalty box, next contact in " +
-                      ((nextContact-currentTime)/1000) + " seconds");
+            // wait a random amount of time for next contact
+            currentTime = System.currentTimeMillis();
+            long nextContact = currentTime + 60 * 1000 +
+                               backoff.nextInt(maxBackoff*1000);
+            penaltyBox.put(cr.getHost(), new Long(nextContact));          
+            LOG.warning(reduceTask.getTaskId() + " adding host " +
+                        cr.getHost() + " to penalty box, next contact in " +
+                        ((nextContact-currentTime)/1000) + " seconds");
+          }
+          uniqueHosts.remove(cr.getHost());
+          numInFlight--;
         }
-        uniqueHosts.remove(cr.getHost());
-        numInFlight--;
         
         // ensure we have enough to keep us busy
         if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) {
@@ -352,9 +466,13 @@
     }
 
     // all done, inform the copiers to exit
-    synchronized (scheduledCopies) {
-     for (int i=0; i < copiers.length; i++) {
-       copiers[i].interrupt();
+    leaseChecker.interrupt();
+    synchronized (copiers) {
+      synchronized (scheduledCopies) {
+        for (int i=0; i < copiers.length; i++) {
+          copiers[i].interrupt();
+          copiers[i] = null;
+        }
       }
     }
     
@@ -363,17 +481,18 @@
   
   
   private CopyResult getCopyResult() {  
-    CopyResult cr = null;
-    
-    synchronized (copyResults) {
-      while (copyResults.isEmpty()) {
+   synchronized (copyResults) {
+      while (!killed && copyResults.isEmpty()) {
         try {
           copyResults.wait();
         } catch (InterruptedException e) { }
       }
-      cr = (CopyResult)copyResults.remove(0);      
+      if (copyResults.isEmpty()) {
+        return null;
+      } else {
+        return (CopyResult) copyResults.remove(0);
+      }
     }    
-    return cr;
   }
   
   /** Queries the job tracker for a set of outputs ready to be copied
@@ -419,6 +538,17 @@
   public void close() throws IOException {
     getTask().getProgress().setStatus("closed");
     this.mapOutputFile.removeAll(getTask().getTaskId());
+  }
+
+  /**
+   * Kill the child process, but also kick getCopyResult so that it checks
+   * the kill flag.
+   */
+  public void kill() {
+    synchronized (copyResults) {
+      super.kill();
+      copyResults.notify();
+    }
   }
 
 }