You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/16 00:31:39 UTC

svn commit: r487715 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapTask.java

Author: cutting
Date: Fri Dec 15 15:31:39 2006
New Revision: 487715

URL: http://svn.apache.org/viewvc?view=rev&rev=487715
Log:
HADOOP-813.  Fix map output sorting to report progress.  Contributed by Devaraj.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=487715&r1=487714&r2=487715
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Dec 15 15:31:39 2006
@@ -101,6 +101,10 @@
 28. HADOOP-824.  Rename DFSShell to be FsShell, since it applies
     generically to all FileSystem implementations.  (cutting)
 
+29. HADOOP-813.  Fix map output sorting to report progress, so that
+    sorts which take longer than the task timeout do not fail.
+    (Devaraj Das via cutting)
+
 
 Release 0.9.2 - 2006-12-15
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=487715&r1=487714&r2=487715
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Dec 15 15:31:39 2006
@@ -35,6 +35,7 @@
 import org.apache.commons.logging.*;
 import org.apache.hadoop.metrics.Metrics;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
 
@@ -341,6 +342,7 @@
                   throws IOException {
                   synchronized (this) {
                     writer.append(key, value);
+                    reportProgress(umbilical);
                   }
                 }
               };
@@ -366,6 +368,7 @@
       while (values.more()) {
         combiner.reduce(values.getKey(), values, combineCollector, reporter);
         values.nextKey();
+        reportProgress(umbilical);
       }
     }
     
@@ -393,6 +396,7 @@
         value.readFields(valIn);
 
         writer.append(key, value);
+        reportProgress(umbilical);
       }
     }
     
@@ -425,62 +429,87 @@
                   compressionType, codec);
           finalIndexOut.writeLong(segmentStart);
           finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
+          reportProgress(umbilical);
         }
         finalOut.close();
         finalIndexOut.close();
         return;
       }
-      
-      Path [] filename = new Path[numSpills];
-      Path [] indexFileName = new Path[numSpills];
-      FSDataInputStream in[] = new FSDataInputStream[numSpills];
-      FSDataInputStream indexIn[] = new FSDataInputStream[numSpills];
-      
-      for(int i = 0; i < numSpills; i++) {
-        filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
-        in[i] = localFs.open(filename[i]);
-        indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
-        indexIn[i] = localFs.open(indexFileName[i]);
-      }
-      
-      //create a sorter object as we need access to the SegmentDescriptor
-      //class and merge methods
-      Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
-      sorter.setFactor(numSpills);
-      
-      for (int parts = 0; parts < partitions; parts++){
-        List<SegmentDescriptor> segmentList = new ArrayList(numSpills);
+      //spawn a thread to give merge progress heartbeats
+      Thread sortProgress = new Thread() {
+        public void run() {
+          while (true) {
+            try {
+              reportProgress(umbilical);
+              Thread.sleep(PROGRESS_INTERVAL);
+            } catch (InterruptedException e) {
+                return;
+            } catch (Throwable e) {
+                LOG.info("Thread Exception in " +
+                                   "reporting sort progress\n" +
+                                   StringUtils.stringifyException(e));
+                continue;
+            }
+          }
+        }
+      };
+      sortProgress.setName("Sort progress reporter for task "+getTaskId());
+      sortProgress.setDaemon(true);
+      sortProgress.start();
+      try {
+        Path [] filename = new Path[numSpills];
+        Path [] indexFileName = new Path[numSpills];
+        FSDataInputStream in[] = new FSDataInputStream[numSpills];
+        FSDataInputStream indexIn[] = new FSDataInputStream[numSpills];
+        
         for(int i = 0; i < numSpills; i++) {
-          long segmentOffset = indexIn[i].readLong();
-          long segmentLength = indexIn[i].readLong();
-          SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
-                  segmentLength, filename[i]);
-          s.preserveInput(true);
-          s.doSync();
-          segmentList.add(i, s);
+          filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
+          in[i] = localFs.open(filename[i]);
+          indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
+          indexIn[i] = localFs.open(indexFileName[i]);
         }
-        segmentStart = finalOut.getPos();
-        SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, 
-                job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
-                compressionType, codec);
-        sorter.writeFile(sorter.merge(segmentList), writer);
-        //add a sync block - required esp. for block compression to ensure
-        //partition data don't span partition boundaries
-        writer.sync();
-        //when we write the offset/length to the final index file, we write
-        //longs for both. This helps us to reliably seek directly to the
-        //offset/length for a partition when we start serving the byte-ranges
-        //to the reduces. We probably waste some space in the file by doing
-        //this as opposed to writing VLong but it helps us later on.
-        finalIndexOut.writeLong(segmentStart);
-        finalIndexOut.writeLong(finalOut.getPos()-segmentStart);
-      }
-      finalOut.close();
-      finalIndexOut.close();
-      //cleanup
-      for(int i = 0; i < numSpills; i++) {
-        in[i].close(); localFs.delete(filename[i]);
-        indexIn[i].close(); localFs.delete(indexFileName[i]);
+        
+        //create a sorter object as we need access to the SegmentDescriptor
+        //class and merge methods
+        Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
+        sorter.setFactor(numSpills);
+        
+        for (int parts = 0; parts < partitions; parts++){
+          List<SegmentDescriptor> segmentList = new ArrayList(numSpills);
+          for(int i = 0; i < numSpills; i++) {
+            long segmentOffset = indexIn[i].readLong();
+            long segmentLength = indexIn[i].readLong();
+            SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
+                segmentLength, filename[i]);
+            s.preserveInput(true);
+            s.doSync();
+            segmentList.add(i, s);
+          }
+          segmentStart = finalOut.getPos();
+          SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, 
+              job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
+              compressionType, codec);
+          sorter.writeFile(sorter.merge(segmentList), writer);
+          //add a sync block - required esp. for block compression to ensure
+          //partition data don't span partition boundaries
+          writer.sync();
+          //when we write the offset/length to the final index file, we write
+          //longs for both. This helps us to reliably seek directly to the
+          //offset/length for a partition when we start serving the byte-ranges
+          //to the reduces. We probably waste some space in the file by doing
+          //this as opposed to writing VLong but it helps us later on.
+          finalIndexOut.writeLong(segmentStart);
+          finalIndexOut.writeLong(finalOut.getPos()-segmentStart);
+        }
+        finalOut.close();
+        finalIndexOut.close();
+        //cleanup
+        for(int i = 0; i < numSpills; i++) {
+          in[i].close(); localFs.delete(filename[i]);
+          indexIn[i].close(); localFs.delete(indexFileName[i]);
+        }
+      } finally {
+        sortProgress.interrupt();
       }
     }