You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/02 19:10:35 UTC

svn commit: r632803 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/ReduceTask.java

Author: ddas
Date: Sun Mar  2 10:10:33 2008
New Revision: 632803

URL: http://svn.apache.org/viewvc?rev=632803&view=rev
Log:
HADOOP-910. Enables Reduces to do merges for the on-disk map output files in parallel with their copying. Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=632803&r1=632802&r2=632803&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sun Mar  2 10:10:33 2008
@@ -50,6 +50,9 @@
     HADOOP-2895. Let the profiling string be configurable.
     (Martin Traverso via cdouglas)
 
+    HADOOP-910. Enables Reduces to do merges for the on-disk map output files 
+    in parallel with their copying. (Amar Kamat via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-2790.  Fixed inefficient method hasSpeculativeTask by removing

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=632803&r1=632802&r2=632803&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Sun Mar  2 10:10:33 2008
@@ -36,6 +36,8 @@
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.Comparator;
 import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
@@ -47,6 +49,8 @@
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.InputBuffer;
@@ -65,7 +69,6 @@
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 import static org.apache.hadoop.mapred.Task.Counter.*;
 
@@ -98,6 +101,29 @@
     getCounters().findCounter(REDUCE_INPUT_RECORDS);
   private Counters.Counter reduceOutputCounter = 
     getCounters().findCounter(REDUCE_OUTPUT_RECORDS);
+  
+  // A custom comparator for map output files. Here the ordering is determined
+  // by the file's size and path. In case of files with same size and different
+  // file paths, the first parameter is considered smaller than the second one.
+  // In case of files with same size and path are considered equal.
+  private Comparator<FileStatus> mapOutputFileComparator = 
+    new Comparator<FileStatus>() {
+      public int compare(FileStatus a, FileStatus b) {
+        if (a.getLen() < b.getLen())
+          return -1;
+        else if (a.getLen() == b.getLen())
+          if (a.getPath().toString().equals(b.getPath().toString()))
+            return 0;
+          else
+            return -1; 
+        else
+          return 1;
+      }
+  };
+  
+  // A sorted set for keeping a set of map output files on disk
+  private final SortedSet<FileStatus> mapOutputFilesOnDisk = 
+    new TreeSet<FileStatus>(mapOutputFileComparator);
 
   public ReduceTask() {
     super();
@@ -138,6 +164,24 @@
 
     numMaps = in.readInt();
   }
+  
+  // Get the input files for the reducer.
+  private Path[] getMapFiles(FileSystem fs, boolean isLocal) 
+  throws IOException {
+    List<Path> fileList = new ArrayList<Path>();
+    if (isLocal) {
+      // for local jobs
+      for(int i = 0; i < numMaps; ++i) {
+        fileList.add(mapOutputFile.getInputFile(i, getTaskId()));
+      }
+    } else {
+      // for non local jobs
+      for (FileStatus filestatus : mapOutputFilesOnDisk) {
+        fileList.add(filestatus.getPath());
+      }
+    }
+    return fileList.toArray(new Path[0]);
+  }
 
   /** Iterates values while keys match in sorted input. */
   static class ValuesIterator implements Iterator {
@@ -253,34 +297,19 @@
     startCommunicationThread(umbilical);
 
     FileSystem lfs = FileSystem.getLocal(job);
+    boolean isLocal = true;
     if (!job.get("mapred.job.tracker", "local").equals("local")) {
       reduceCopier = new ReduceCopier(umbilical, job);
       if (!reduceCopier.fetchOutputs()) {
         throw new IOException(getTaskId() + "The reduce copier failed");
       }
+      isLocal = false;
     }
     copyPhase.complete();                         // copy is already complete
     
 
-    // open a file to collect map output
-    // since we don't know how many map outputs got merged in memory, we have
-    // to check whether a given map output exists, and if it does, add it in
-    // the list of files to merge, otherwise not.
-    List<Path> mapFilesList = new ArrayList<Path>();
-    for(int i=0; i < numMaps; i++) {
-      Path f;
-      try {
-        //catch and ignore DiskErrorException, since some map outputs will
-        //really be absent (inmem merge).
-        f = mapOutputFile.getInputFile(i, getTaskId());
-      } catch (DiskErrorException d) { 
-        continue;
-      }
-      if (lfs.exists(f))
-        mapFilesList.add(f);
-    }
-    Path[] mapFiles = new Path[mapFilesList.size()];
-    mapFiles = mapFilesList.toArray(mapFiles);
+    // get the input files for the reducer to merge
+    Path[] mapFiles = getMapFiles(lfs, isLocal);
     
     Path tempDir = new Path(getTaskId()); 
 
@@ -298,6 +327,10 @@
     rIter = sorter.merge(mapFiles, tempDir, 
         !conf.getKeepFailedTaskFiles()); // sort
 
+    // free up the data structures
+    mapOutputFilesOnDisk.clear();
+    mapFiles = null;
+    
     sortPhase.complete();                         // sort is complete
     setPhase(TaskStatus.Phase.REDUCE); 
 
@@ -421,11 +454,21 @@
     private SequenceFile.Sorter sorter;
     
     /**
+     * Number of files to merge at a time
+     */
+    private int ioSortFactor;
+    
+    /**
      * A reference to the throwable object (if merge throws an exception)
      */
     private volatile Throwable mergeThrowable;
     
     /** 
+     * A flag to indicate that localFS merge is in progress
+     */
+    private volatile boolean localFSMergeInProgress = false;
+    
+    /** 
      * A flag to indicate that merge is in progress
      */
     private volatile boolean mergeInProgress = false;
@@ -755,12 +798,12 @@
         if (tmpFilename == null)
           throw new IOException("File " + filename + "-" + id + 
                                 " not created");
+        // This file could have been created in the inmemory
+        // fs or the localfs. So need to get the filesystem owning the path. 
+        FileSystem fs = tmpFilename.getFileSystem(conf);
         long bytes = -1;
         // lock the ReduceTask while we do the rename
         synchronized (ReduceTask.this) {
-          // This file could have been created in the inmemory
-          // fs or the localfs. So need to get the filesystem owning the path. 
-          FileSystem fs = tmpFilename.getFileSystem(conf);
           if (!neededOutputs.contains(loc.getMapId())) {
             fs.delete(tmpFilename);
             return CopyResult.OBSOLETE;
@@ -802,6 +845,16 @@
           }
           neededOutputs.remove(loc.getMapId());
         }
+        
+        // Check if the map output file hits the local file-system by checking 
+        // their schemes
+        String localFSScheme = localFileSys.getUri().getScheme();
+        String outputFileScheme = fs.getUri().getScheme();
+        if (localFSScheme.equals(outputFileScheme)) {
+          synchronized (mapOutputFilesOnDisk) {        
+            mapOutputFilesOnDisk.add(fs.getFileStatus(filename));
+          }
+        }
         return bytes;
       }
       
@@ -861,6 +914,7 @@
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
       this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
       
+      this.ioSortFactor = conf.getInt("io.sort.factor", 10);
       // the exponential backoff formula
       //    backoff (t) = init * base^(t-1)
       // so for max retries we get
@@ -931,6 +985,13 @@
       copiers = new MapOutputCopier[numCopiers];
       
       Reporter reporter = getReporter(umbilical);
+      // create an instance of the sorter for merging the on-disk files
+      SequenceFile.Sorter localFileSystemSorter = 
+        new SequenceFile.Sorter(localFileSys, conf.getOutputKeyComparator(), 
+                                conf.getMapOutputKeyClass(),
+                                conf.getMapOutputValueClass(), conf);
+      localFileSystemSorter.setProgressable(reporter);
+      
       // start all the copying threads
       for (int i=0; i < copiers.length; i++) {
         copiers[i] = new MapOutputCopier(reporter);
@@ -1040,6 +1101,23 @@
                    " of " + numKnown + " known outputs (" + numSlow +
                    " slow hosts and " + numDups + " dup hosts)");
           
+          // Check if a on-disk merge can be done. This will help if there
+          // are no copies to be fetched but sufficient copies to be merged.
+          synchronized (mapOutputFilesOnDisk) {
+            if (!localFSMergeInProgress
+                && (mapOutputFilesOnDisk.size() >= (2 * ioSortFactor - 1))) {
+              // make sure that only one thread merges the disk files
+              localFSMergeInProgress = true;
+              // start the on-disk-merge process
+              LocalFSMerger lfsm =  
+                new LocalFSMerger((LocalFileSystem)localFileSys, 
+                                  localFileSystemSorter);
+              lfsm.setName("Thread for merging on-disk files");
+              lfsm.setDaemon(true);
+              lfsm.start();
+            }
+          }
+          
           // if we have no copies in flight and we can't schedule anything
           // new, just wait for a bit
           try {
@@ -1214,6 +1292,11 @@
         //Do a merge of in-memory files (if there are any)
         if (mergeThrowable == null) {
           try {
+            // Wait for the on-disk merge to complete
+            while (localFSMergeInProgress) {
+              Thread.sleep(200);
+            }
+            
             //wait for an ongoing merge (if it is in flight) to complete
             while (mergeInProgress) {
               Thread.sleep(200);
@@ -1266,6 +1349,11 @@
                      " Merge of the " +inMemClosedFiles.length +
                      " files in InMemoryFileSystem complete." +
                      " Local file is " + outputPath);
+            
+            FileStatus status = localFileSys.getFileStatus(outputPath);
+            synchronized (mapOutputFilesOnDisk) {
+              mapOutputFilesOnDisk.add(status);
+            }
           } catch (Throwable t) {
             LOG.warn(reduceTask.getTaskId() +
                      " Final merge of the inmemory files threw an exception: " + 
@@ -1393,6 +1481,75 @@
     }
     
     
+    /** Starts merging the local copy (on disk) of the map's output so that
+     * most of the reducer's input is sorted i.e overlapping shuffle
+     * and merge phases.
+     */
+    private class LocalFSMerger extends Thread {
+      private LocalFileSystem localFileSys;
+      private SequenceFile.Sorter sorter;
+
+      public LocalFSMerger(LocalFileSystem fs, SequenceFile.Sorter sorter) {
+        this.localFileSys = fs;
+        this.sorter = sorter;
+      }
+
+      public void run() {
+        try {
+          Path[] mapFiles = new Path[ioSortFactor];
+          long approxOutputSize = 0;
+          int bytesPerSum = 
+            reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
+          LOG.info(reduceTask.getTaskId() 
+                   + " Merging map output files on disk");
+          // 1. Prepare the list of files to be merged. This list is prepared
+          // using a list of map output files on disk. Currently we merge
+          // io.sort.factor files into 1.
+          synchronized (mapOutputFilesOnDisk) {
+            for (int i = 0; i < ioSortFactor; ++i) {
+              FileStatus filestatus = mapOutputFilesOnDisk.first();
+              mapOutputFilesOnDisk.remove(filestatus);
+              mapFiles[i] = filestatus.getPath();
+              approxOutputSize += filestatus.getLen();
+            }
+          }
+          // add the checksum length
+          approxOutputSize += ChecksumFileSystem
+                              .getChecksumLength(approxOutputSize,
+                                                 bytesPerSum);
+
+          // 2. Start the on-disk merge process
+          Path outputPath = 
+            lDirAlloc.getLocalPathForWrite(mapFiles[0].toString(), 
+                                           approxOutputSize, conf)
+            .suffix(".merged");
+          SequenceFile.Writer writer =
+            sorter.cloneFileAttributes(mapFiles[0], outputPath, null);
+          SequenceFile.Sorter.RawKeyValueIterator iter;
+          Path tmpDir = new Path(reduceTask.getTaskId());
+          iter = sorter.merge(mapFiles, true, ioSortFactor, tmpDir);
+          sorter.writeFile(iter, writer);
+          writer.close();
+          
+          synchronized (mapOutputFilesOnDisk) {
+            mapOutputFilesOnDisk.add(localFileSys.getFileStatus(outputPath));
+          }
+          
+          LOG.info(reduceTask.getTaskId()
+                   + " Finished merging map output files on disk.");
+        } catch (IOException ioe) {
+          LOG.warn(reduceTask.getTaskId()
+                   + " Merging of the local FS files threw an exception: "
+                   + StringUtils.stringifyException(ioe));
+          if (mergeThrowable == null) {
+            mergeThrowable = ioe;
+          }
+        } finally {
+          localFSMergeInProgress = false;
+        }
+      }
+    }
+
     private class InMemFSMergeThread extends Thread {
       private InMemoryFileSystem inMemFileSys;
       private LocalFileSystem localFileSys;
@@ -1451,6 +1608,11 @@
                      " Merge of the " +inMemClosedFiles.length +
                      " files in InMemoryFileSystem complete." +
                      " Local file is " + outputPath);
+            
+            FileStatus status = localFileSys.getFileStatus(outputPath);
+            synchronized (mapOutputFilesOnDisk) {
+              mapOutputFilesOnDisk.add(status);
+            }
           }
           else {
             LOG.info(reduceTask.getTaskId() + " Nothing to merge from " +