You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/01 23:26:33 UTC

svn commit: r481429 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java src/java/org/apache/hadoop/mapred/TaskRunner.java

Author: cutting
Date: Fri Dec  1 14:26:30 2006
New Revision: 481429

URL: http://svn.apache.org/viewvc?view=rev&rev=481429
Log:
HADOOP-750.  Fix a potential race condition during the mapreduce shuffle.  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=481429&r1=481428&r2=481429
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Dec  1 14:26:30 2006
@@ -143,6 +143,9 @@
 42. HADOOP-430.  Stop datanode's HTTP server when registration with
     namenode fails.  (Wendy Chien via cutting)
 
+43. HADOOP-750.  Fix a potential race condition during mapreduce
+    shuffle.  (omalley via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=481429&r1=481428&r2=481429
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Dec  1 14:26:30 2006
@@ -121,24 +121,14 @@
   }
 
   private class PingTimer implements Progressable {
-    private long pingTime;
-    
-    public synchronized void reset() {
-      pingTime = 0;
-    }
-    
-    public synchronized long getLastPing() {
-      return pingTime;
-    }
+    Task task = getTask();
+    TaskTracker tracker = getTracker();
     
     public void progress() {
-      synchronized (this) {
-        pingTime = System.currentTimeMillis();
-        getTask().reportProgress(getTracker());
-      }
+      task.reportProgress(tracker);
     }
   }
-  
+
   private static int nextMapOutputCopierId = 0;
 
   /** Copies map outputs as they become available */
@@ -149,14 +139,8 @@
     private int id = nextMapOutputCopierId++;
     
     public MapOutputCopier() {
-    }
-    
-    /**
-     * Get the last time that this copier made progress.
-     * @return the System.currentTimeMillis when this copier last made progress
-     */
-    public long getLastProgressTime() {
-      return pingTimer.getLastPing();
+      setName("MapOutputCopier " + reduceTask.getTaskId() + "." + id);
+      LOG.debug(getName() + " created");
     }
     
     /**
@@ -185,6 +169,7 @@
     
     private synchronized void finish(long size) {
       if (currentLocation != null) {
+        LOG.debug(getName() + " finishing " + currentLocation + " = " + size);
         synchronized (copyResults) {
           copyResults.add(new CopyResult(currentLocation, size));
           copyResults.notify();
@@ -211,15 +196,14 @@
 
           try {
             start(loc);
-            pingTimer.progress();
             size = copyOutput(loc, pingTimer);
-            pingTimer.reset();
           } catch (IOException e) {
             LOG.warn(reduceTask.getTaskId() + " copy failed: " +
                         loc.getMapTaskId() + " from " + loc.getHost());
             LOG.warn(StringUtils.stringifyException(e));
+          } finally {
+            finish(size);
           }
-          finish(size);
         } catch (InterruptedException e) { 
           return; // ALL DONE
         } catch (Throwable th) {
@@ -268,49 +252,6 @@
     }
 
   }
-  
-  private class MapCopyLeaseChecker extends Thread {
-    private static final long STALLED_COPY_CHECK = 60 * 1000;
-    private long lastStalledCheck = 0;
-    
-    public void run() {
-      while (true) {
-        try {
-          long currentTime = System.currentTimeMillis();
-          if (currentTime - lastStalledCheck > STALLED_COPY_CHECK) {
-            lastStalledCheck = currentTime;
-            synchronized (copiers) {
-              for(int i=0; i < copiers.length; ++i) {
-                if (copiers[i] == null) {
-                  break;
-                }
-                long lastProgress = copiers[i].getLastProgressTime();
-                if (lastProgress != 0 && 
-                    currentTime - lastProgress > STALLED_COPY_TIMEOUT)  {
-                  LOG.warn("Map output copy stalled on " +
-                           copiers[i].getLocation());
-                  // mark the current file as failed
-                  copiers[i].fail();
-                  // tell the thread to stop
-                  copiers[i].interrupt();
-                  // create a replacement thread
-                  copiers[i] = new MapOutputCopier();
-                  copiers[i].start();
-                }
-              }
-            }
-          } else {
-            Thread.sleep(lastStalledCheck + STALLED_COPY_CHECK - currentTime);
-          }
-        } catch (InterruptedException ie) {
-          return;
-        } catch (Throwable th) {
-          LOG.error("MapCopyLeaseChecker error: " + 
-                    StringUtils.stringifyException(th));
-        }
-      }      
-    }
-  }
 
   public ReduceTaskRunner(Task task, TaskTracker tracker, 
                           JobConf conf) throws IOException {
@@ -352,7 +293,6 @@
     DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
     Random         backoff = new Random();
     final Progress copyPhase = getTask().getProgress().phase();
-    MapCopyLeaseChecker leaseChecker = null;
     
     for (int i = 0; i < numOutputs; i++) {
       neededOutputs.add(new Integer(i));
@@ -367,8 +307,6 @@
       copiers[i] = new MapOutputCopier();
       copiers[i].start();
     }
-    leaseChecker = new MapCopyLeaseChecker();
-    leaseChecker.start();
     
     // start the clock for bandwidth measurement
     long startTime = System.currentTimeMillis();
@@ -450,6 +388,7 @@
       } catch (InterruptedException e) { } // IGNORE
 
       while (!killed && numInFlight > 0) {
+        LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight);
         CopyResult cr = getCopyResult();
         
         if (cr != null) {
@@ -506,7 +445,6 @@
     }
 
     // all done, inform the copiers to exit
-    leaseChecker.interrupt();
     synchronized (copiers) {
       synchronized (scheduledCopies) {
         for (int i=0; i < copiers.length; i++) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=481429&r1=481428&r2=481429
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Dec  1 14:26:30 2006
@@ -33,7 +33,7 @@
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");
 
-  boolean killed = false;
+  volatile boolean killed = false;
   private Process process;
   private Task t;
   private TaskTracker tracker;