You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/06/01 22:52:09 UTC

svn commit: r543607 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapTask.java src/java/org/apache/hadoop/mapred/ReduceTask.java

Author: cutting
Date: Fri Jun  1 13:52:09 2007
New Revision: 543607

URL: http://svn.apache.org/viewvc?view=rev&rev=543607
Log:
HADOOP-1431.  Fix so that sort progress reporting during map truns only while sorting so that stuck maps are correctly terminated.  Contributed by Arun & Devaraj.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=543607&r1=543606&r2=543607
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jun  1 13:52:09 2007
@@ -511,6 +511,10 @@
 130. HADOOP-1332.  Fix so that TaskTracker exits reliably during unit
      tests on Windows.  (omalley via cutting)
 
+131. HADOOP-1431.  Fix so that sort progress reporting during map runs
+     only while sorting, so that stuck maps are correctly terminated.
+     (Devaraj Das and Arun C Murthy via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=543607&r1=543606&r2=543607
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Jun  1 13:52:09 2007
@@ -163,6 +163,7 @@
           throws IOException {
 
           setProgress(getProgress());
+          reportProgress(umbilical);
           long beforePos = getPos();
           boolean ret = rawIn.next(key, value);
           if (ret) {
@@ -178,22 +179,16 @@
         }
       };
 
-    Thread sortProgress = createProgressThread(umbilical);
     MapRunnable runner =
       (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
 
     try {
-      sortProgress.start();
       runner.run(in, collector, reporter);      
       collector.flush();
     } finally {
       //close
       in.close();                               // close input
       collector.close();
-      sortProgress.interrupt();
-      try {
-        sortProgress.join();
-      } catch (InterruptedException ie){ }
     }
     done(umbilical);
   }
@@ -220,6 +215,7 @@
       };
     sortProgress.setName("Sort progress reporter for task "+getTaskId());
     sortProgress.setDaemon(true);
+    sortProgress.start();
     return sortProgress;
   }
   
@@ -273,6 +269,7 @@
     private Partitioner partitioner;
     private JobConf job;
     private Reporter reporter;
+    final private TaskUmbilicalProtocol umbilical;
 
     private DataOutputBuffer keyValBuffer; //the buffer where key/val will
                                            //be stored before they are 
@@ -302,6 +299,7 @@
 
       this.job = job;
       this.reporter = reporter;
+      this.umbilical = umbilical;
       this.comparator = job.getOutputKeyComparator();
       this.keyClass = job.getMapOutputKeyClass();
       this.valClass = job.getMapOutputValueClass();
@@ -324,6 +322,7 @@
                                                                 job.getClass("map.sort.class", MergeSorter.class,
                                                                              BufferSorter.class), job);
     }
+    
     private void startPartition(int partNumber) throws IOException {
       //We create the sort output as multiple sequence files within a spilled
       //file. So we create a writer for each partition. 
@@ -376,10 +375,20 @@
         for (int i = 0; i < partitions; i++)
           totalMem += sortImpl[i].getMemoryUtilized();
         if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
-          sortAndSpillToDisk();
-          keyValBuffer.reset();
-          for (int i = 0; i < partitions; i++)
-            sortImpl[i].close(); 
+
+          // Start the progress thread
+          Thread progress = createProgressThread(umbilical);
+
+          try {
+            sortAndSpillToDisk();
+            keyValBuffer.reset();
+            for (int i = 0; i < partitions; i++) {
+              sortImpl[i].close();
+            }
+          } finally {
+            // Stop the progress thread
+            progress.interrupt();
+          }
         }
       }
     }
@@ -597,13 +606,22 @@
     }
 
     public void flush() throws IOException {
-      //check whether the length of the key/value buffer is 0. If not, then
-      //we need to spill that to disk. Note that we reset the key/val buffer
-      //upon each spill (so a length > 0 means that we have not spilled yet)
-      if (keyValBuffer.getLength() > 0) {
-        sortAndSpillToDisk();
+
+      // Start the progress thread
+      Thread progress = createProgressThread(umbilical);
+
+      try {
+        //check whether the length of the key/value buffer is 0. If not, then
+        //we need to spill that to disk. Note that we reset the key/val buffer
+        //upon each spill (so a length > 0 means that we have not spilled yet)
+        if (keyValBuffer.getLength() > 0) {
+          sortAndSpillToDisk();
+        }
+        mergeParts();
+      } finally {
+        // Stop the progress thread
+        progress.interrupt();
       }
-      mergeParts();
     }
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=543607&r1=543606&r2=543607
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Jun  1 13:52:09 2007
@@ -37,6 +37,7 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -78,7 +79,7 @@
   
   private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
   private int numMaps;
-  private boolean sortComplete;
+  AtomicBoolean sortComplete = new AtomicBoolean(false);
   private ReduceCopier reduceCopier;
 
   { 
@@ -283,7 +284,7 @@
     // spawn a thread to give sort progress heartbeats
     Thread sortProgress = new Thread() {
         public void run() {
-          while (!sortComplete) {
+          while (!sortComplete.get()) {
             try {
               reportProgress(umbilical);
               Thread.sleep(PROGRESS_INTERVAL);
@@ -298,6 +299,7 @@
           }
         }
       };
+    sortProgress.setDaemon(true);
     sortProgress.setName("Sort progress reporter for task "+getTaskId());
 
     Path tempDir = new Path(getTaskId()); 
@@ -317,7 +319,7 @@
                            !conf.getKeepFailedTaskFiles()); // sort
 
     } finally {
-      sortComplete = true;
+      sortComplete.set(true);
     }
 
     sortPhase.complete();                         // sort is complete