You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/09/11 22:26:16 UTC

svn commit: r694459 - in /hadoop/core/trunk: ./ conf/ src/docs/src/documentation/content/xdocs/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: cdouglas
Date: Thu Sep 11 13:26:11 2008
New Revision: 694459

URL: http://svn.apache.org/viewvc?rev=694459&view=rev
Log:
HADOOP-3446. Keep map outputs in memory during the reduce. Remove
fs.inmemory.size.mb and replace with properties defining in memory map
output retention during the shuffle and reduce relative to maximum heap
usage.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapCollection.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=694459&r1=694458&r2=694459&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Sep 11 13:26:11 2008
@@ -51,6 +51,11 @@
     committing output files. Moves job setup to jobclient, and moves jobcleanup
     to a separate task. (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-3446. Keep map outputs in memory during the reduce. Remove
+    fs.inmemory.size.mb and replace with properties defining in memory map
+    output retention during the shuffle and reduce relative to maximum heap
+    usage. (cdouglas)
+
   NEW FEATURES
 
     HADOOP-3341. Allow streaming jobs to specify the field separator for map

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=694459&r1=694458&r2=694459&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Thu Sep 11 13:26:11 2008
@@ -233,12 +233,6 @@
 </property>
 
 <property>
-  <name>fs.inmemory.size.mb</name>
-  <value>75</value>
-  <description>The size of the in-memory filsystem instance in MB</description>
-</property>
-
-<property>
   <name>fs.checkpoint.dir</name>
   <value>${hadoop.tmp.dir}/dfs/namesecondary</value>
   <description>Determines where on the local filesystem the DFS secondary
@@ -895,6 +889,34 @@
 </property>
 
 <property>
+  <name>mapred.job.shuffle.merge.percent</name>
+  <value>0.66</value>
+  <description>The usage threshold at which an in-memory merge will be
+  initiated, expressed as a percentage of the total memory allocated to
+  storing in-memory map outputs, as defined by
+  mapred.job.shuffle.input.buffer.percent.
+  </description>
+</property>
+
+<property>
+  <name>mapred.job.shuffle.input.buffer.percent</name>
+  <value>0.70</value>
+  <description>The percentage of memory to be allocated from the maximum heap
+  size to storing map outputs during the shuffle.
+  </description>
+</property>
+
+<property>
+  <name>mapred.job.reduce.input.buffer.percent</name>
+  <value>0.0</value>
+  <description>The percentage of memory- relative to the maximum heap size- to
+  retain map outputs during the reduce. When the shuffle is concluded, any
+  remaining map outputs in memory must consume less than this threshold before
+  the reduce can begin.
+  </description>
+</property>
+
+<property>
   <name>mapred.map.tasks.speculative.execution</name>
   <value>true</value>
   <description>If true, then multiple instances of some map tasks 

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=694459&r1=694458&r2=694459&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Thu Sep 11 13:26:11 2008
@@ -1118,7 +1118,157 @@
         greater than any value specified for a maximum heap-size
         of the child jvm via <code>mapred.child.java.opts</code>, or a ulimit
         value in <code>mapred.child.ulimit</code>. </p>
-        
+
+        <p>The memory available to some parts of the framework is also
+        configurable. In map and reduce tasks, performance may be influenced
+        by adjusting parameters influencing the concurrency of operations and
+        the frequency with which data will hit disk. Monitoring the filesystem
+        counters for a job- particularly relative to byte counts from the map
+        and into the reduce- is invaluable to the tuning of these
+        parameters.</p>
+
+        <section>
+          <title>Map Parameters</title>
+
+          <p>A record emitted from a map will be serialized into a buffer and
+          metadata will be stored into accounting buffers. As described in the
+          following options, when either the serialization buffer or the
+          metadata exceed a threshold, the contents of the buffers will be
+          sorted and written to disk in the background while the map continues
+          to output records. If either buffer fills completely while the spill
+          is in progress, the map thread will block. When the map is finished,
+          any remaining records are written to disk and all on-disk segments
+          are merged into a single file. Minimizing the number of spills to
+          disk can decrease map time, but a larger buffer also decreases the
+          memory available to the mapper.</p>
+
+          <table>
+            <tr><th>Name</th><th>Type</th><th>Description</th></tr>
+            <tr><td>io.sort.mb</td><td>int</td>
+                <td>The cumulative size of the serialization and accounting
+                buffers storing records emitted from the map, in megabytes.
+                </td></tr>
+            <tr><td>io.sort.record.percent</td><td>float</td>
+                <td>The ratio of serialization to accounting space can be
+                adjusted. Each serialized record requires 16 bytes of
+                accounting information in addition to its serialized size to
+                effect the sort. This percentage of space allocated from
+                <code>io.sort.mb</code> affects the probability of a spill to
+                disk being caused by either exhaustion of the serialization
+                buffer or the accounting space. Clearly, for a map outputting
+                small records, a higher value than the default will likely
+                decrease the number of spills to disk.</td></tr>
+            <tr><td>io.sort.spill.percent</td><td>float</td>
+                <td>This is the threshold for the accounting and serialization
+                buffers. When this percentage of either buffer has filled,
+                their contents will be spilled to disk in the background. Let
+                <code>io.sort.record.percent</code> be <em>r</em>,
+                <code>io.sort.mb</code> be <em>x</em>, and this value be
+                <em>q</em>. The maximum number of records collected before the
+                collection thread will spill is <code>r * x * q * 2^16</code>.
+                Note that a higher value may decrease the number of- or even
+                eliminate- merges, but will also increase the probability of
+                the map task getting blocked. The lowest average map times are
+                usually obtained by accurately estimating the size of the map
+                output and preventing multiple spills.</td></tr>
+          </table>
+
+          <p>Other notes</p>
+          <ul>
+            <li>If either spill threshold is exceeded while a spill is in
+            progress, collection will continue until the spill is finished.
+            For example, if <code>io.sort.buffer.spill.percent</code> is set
+            to 0.33, and the remainder of the buffer is filled while the spill
+            runs, the next spill will include all the collected records, or
+            0.66 of the buffer, and will not generate additional spills. In
+            other words, the thresholds are defining triggers, not
+            blocking.</li>
+            <li>A record larger than the serialization buffer will first
+            trigger a spill, then be spilled to a separate file. It is
+            undefined whether or not this record will first pass through the
+            combiner.</li>
+          </ul>
+        </section>
+
+        <section>
+          <title>Shuffle/Reduce Parameters</title>
+
+          <p>As described previously, each reduce fetches the output assigned
+          to it by the Partitioner via HTTP into memory and periodically
+          merges these outputs to disk. If intermediate compression of map
+          outputs is turned on, each output is decompressed into memory. The
+          following options affect the frequency of these merges to disk prior
+          to the reduce and the memory allocated to map output during the
+          reduce.</p>
+
+          <table>
+            <tr><th>Name</th><th>Type</th><th>Description</th></tr>
+            <tr><td>io.sort.factor</td><td>int</td>
+                <td>Specifies the number of segments on disk to be merged at
+                the same time. It limits the number of open files and
+                compression codecs during the merge. If the number of files
+                exceeds this limit, the merge will proceed in several passes.
+                Though this limit also applies to the map, most jobs should be
+                configured so that hitting this limit is unlikely
+                there.</td></tr>
+            <tr><td>mapred.inmem.merge.threshold</td><td>int</td>
+                <td>The number of sorted map outputs fetched into memory
+                before being merged to disk. Like the spill thresholds in the
+                preceding note, this is not defining a unit of partition, but
+                a trigger. In practice, this is usually set very high (1000)
+                or disabled (0), since merging in-memory segments is often
+                less expensive than merging from disk (see notes following
+                this table). This threshold influences only the frequency of
+                in-memory merges during the shuffle.</td></tr>
+            <tr><td>mapred.job.shuffle.merge.percent</td><td>float</td>
+                <td>The memory threshold for fetched map outputs before an
+                in-memory merge is started, expressed as a percentage of
+                memory allocated to storing map outputs in memory. Since map
+                outputs that can't fit in memory can be stalled, setting this
+                high may decrease parallelism between the fetch and merge.
+                Conversely, values as high as 1.0 have been effective for
+                reduces whose input can fit entirely in memory. This parameter
+                influences only the frequency of in-memory merges during the
+                shuffle.</td></tr>
+            <tr><td>mapred.job.shuffle.input.buffer.percent</td><td>float</td>
+                <td>The percentage of memory- relative to the maximum heapsize
+                as typically specified in <code>mapred.child.java.opts</code>-
+                that can be allocated to storing map outputs during the
+                shuffle. Though some memory should be set aside for the
+                framework, in general it is advantageous to set this high
+                enough to store large and numerous map outputs.</td></tr>
+            <tr><td>mapred.job.reduce.input.buffer.percent</td><td>float</td>
+                <td>The percentage of memory relative to the maximum heapsize
+                in which map outputs may be retained during the reduce. When
+                the reduce begins, map outputs will be merged to disk until
+                those that remain are under the resource limit this defines.
+                By default, all map outputs are merged to disk before the
+                reduce begins to maximize the memory available to the reduce.
+                For less memory-intensive reduces, this should be increased to
+                avoid trips to disk.</td></tr>
+          </table>
+
+          <p>Other notes</p>
+          <ul>
+            <li>If a map output is larger than 25 percent of the memory
+            allocated to copying map outputs, it will be written directly to
+            disk without first staging through memory.</li>
+            <li>When running with a combiner, the reasoning about high merge
+            thresholds and large buffers may not hold. For merges started
+            before all map outputs have been fetched, the combiner is run
+            while spilling to disk. In some cases, one can obtain better
+            reduce times by spending resources combining map outputs- making
+            disk spills small and parallelizing spilling and fetching- rather
+            than aggressively increasing buffer sizes.</li>
+            <li>When merging in-memory map outputs to disk to begin the
+            reduce, if an intermediate merge is necessary because there are
+            segments to spill and at least <code>io.sort.factor</code>
+            segments already on disk, the in-memory map outputs will be part
+            of the intermediate merge.</li>
+          </ul>
+
+        </section>
+
         <p>The task tracker has local directory,
         <code> ${mapred.local.dir}/taskTracker/</code> to create localized
         cache and localized job. It can define multiple local directories 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=694459&r1=694458&r2=694459&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Thu Sep 11 13:26:11 2008
@@ -221,12 +221,12 @@
     private static final int DEFAULT_BUFFER_SIZE = 128*1024;
     private static final int MAX_VINT_SIZE = 9;
 
-    InputStream in;            // Possibly decompressed stream that we read
+    final InputStream in;        // Possibly decompressed stream that we read
     Decompressor decompressor;
     long bytesRead = 0;
-    long fileLength = 0;
+    final long fileLength;
     boolean eof = false;
-    IFileInputStream checksumIn;
+    final IFileInputStream checksumIn;
     
     byte[] buffer = null;
     int bufferSize = DEFAULT_BUFFER_SIZE;
@@ -251,8 +251,6 @@
            fs.getFileStatus(file).getLen(),
            codec);
     }
-    
-    protected Reader() {}
 
     /**
      * Construct an IFile Reader.
@@ -264,7 +262,6 @@
      * @param codec codec
      * @throws IOException
      */
-    
     public Reader(Configuration conf, FSDataInputStream in, long length, 
                   CompressionCodec codec) throws IOException {
       checksumIn = new IFileInputStream(in,length);
@@ -276,7 +273,9 @@
       }
       this.fileLength = length;
       
-      this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+      if (conf != null) {
+        bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+      }
     }
     
     public long getLength() { 
@@ -430,12 +429,14 @@
     TaskAttemptID taskAttemptId;
     
     public InMemoryReader(RamManager ramManager, TaskAttemptID taskAttemptId,
-                          byte[] data, int start, int length) {
+                          byte[] data, int start, int length)
+                          throws IOException {
+      super(null, null, length - start, null);
       this.ramManager = ramManager;
       this.taskAttemptId = taskAttemptId;
       
       buffer = data;
-      fileLength = bufferSize = (length - start);
+      bufferSize = (int)fileLength;
       dataIn.reset(buffer, start, length);
     }
     

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java?rev=694459&r1=694458&r2=694459&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java Thu Sep 11 13:26:11 2008
@@ -31,20 +31,6 @@
 
   public static final long COUNTER_UPDATE_INTERVAL = 60 * 1000;
 
-  //for the inmemory filesystem (to do in-memory merge)
-  /**
-   * Constant denoting when a merge of in memory files will be triggered 
-   */
-  public static final float MAX_INMEM_FILESYS_USE = 0.66f;
-  
-  /**
-   * Constant denoting the max size (in terms of the fraction of the total 
-   * size of the filesys) of a map output file that we will try
-   * to keep in mem. Ideally, this should be a factor of MAX_INMEM_FILESYS_USE
-   */
-  public static final float MAX_INMEM_FILESIZE_FRACTION =
-    MAX_INMEM_FILESYS_USE/2;
-    
   //
   // Result codes
   //

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=694459&r1=694458&r2=694459&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Thu Sep 11 13:26:11 2008
@@ -68,11 +68,36 @@
                             List<Segment<K, V>> segments, 
                             int mergeFactor, Path tmpDir,
                             RawComparator<K> comparator, Progressable reporter)
-  throws IOException {
-    return 
-      new MergeQueue<K, V>(conf, fs, segments, 
-                           comparator, reporter).merge(keyClass, valueClass,
-                                                       mergeFactor, tmpDir);
+      throws IOException {
+    return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
+                 comparator, reporter, false);
+  }
+
+  public static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass,
+                            List<Segment<K, V>> segments,
+                            int mergeFactor, Path tmpDir,
+                            RawComparator<K> comparator, Progressable reporter,
+                            boolean sortSegments)
+      throws IOException {
+    return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+                           sortSegments).merge(keyClass, valueClass,
+                                               mergeFactor, tmpDir);
+  }
+
+  static <K extends Object, V extends Object>
+    RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass,
+                            List<Segment<K, V>> segments,
+                            int mergeFactor, int inMemSegments, Path tmpDir,
+                            RawComparator<K> comparator, Progressable reporter,
+                            boolean sortSegments)
+      throws IOException {
+    return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+                           sortSegments).merge(keyClass, valueClass,
+                                               mergeFactor, inMemSegments,
+                                               tmpDir);
   }
 
   public static <K extends Object, V extends Object>
@@ -201,15 +226,23 @@
       Collections.sort(segments, segmentComparator); 
     }
     
+    public MergeQueue(Configuration conf, FileSystem fs,
+        List<Segment<K, V>> segments, RawComparator<K> comparator,
+        Progressable reporter) {
+      this(conf, fs, segments, comparator, reporter, false);
+    }
 
     public MergeQueue(Configuration conf, FileSystem fs, 
         List<Segment<K, V>> segments, RawComparator<K> comparator,
-        Progressable reporter) {
+        Progressable reporter, boolean sortSegments) {
       this.conf = conf;
       this.fs = fs;
       this.comparator = comparator;
       this.segments = segments;
       this.reporter = reporter;
+      if (sortSegments) {
+        Collections.sort(segments, segmentComparator);
+      }
     }
 
     public void close() throws IOException {
@@ -277,7 +310,13 @@
     
     public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
                                      int factor, Path tmpDir) 
-    throws IOException {
+        throws IOException {
+      return merge(keyClass, valueClass, factor, 0, tmpDir);
+    }
+
+    RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
+                                     int factor, int inMem, Path tmpDir)
+        throws IOException {
       LOG.info("Merging " + segments.size() + " sorted segments");
       
       //create the MergeStreams from the sorted map created in the constructor
@@ -286,8 +325,13 @@
       int origFactor = factor;
       int passNo = 1;
       do {
-        //get the factor for this pass of merge
-        factor = getPassFactor(factor, passNo, numSegments);
+        //get the factor for this pass of merge. We assume in-memory segments
+        //are the first entries in the segment list and that the pass factor
+        //doesn't apply to them
+        factor = getPassFactor(factor, passNo, numSegments - inMem);
+        if (1 == passNo) {
+          factor += inMem;
+        }
         List<Segment<K, V>> segmentsToMerge =
           new ArrayList<Segment<K, V>>();
         int segmentsConsidered = 0;
@@ -326,7 +370,8 @@
         }
         
         //feed the streams to the priority queue
-        initialize(segmentsToMerge.size()); clear();
+        initialize(segmentsToMerge.size());
+        clear();
         for (Segment<K, V> segment : segmentsToMerge) {
           put(segment);
         }
@@ -395,7 +440,12 @@
       } while(true);
     }
     
-    //HADOOP-591
+    /**
+     * Determine the number of segments to merge in a given pass. Assuming more
+     * than factor segments, the first pass should attempt to bring the total
+     * number of segments - 1 to be divisible by the factor - 1 (each pass
+     * takes X segments and produces 1) to minimize the number of merges.
+     */
     private int getPassFactor(int factor, int passNo, int numSegments) {
       if (passNo > 1 || numSegments <= factor || factor == 1) 
         return factor;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=694459&r1=694458&r2=694459&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu Sep 11 13:26:11 2008
@@ -335,44 +335,30 @@
       return;
     }
     
-    FileSystem lfs = FileSystem.getLocal(job);
-    FileSystem rfs = ((LocalFileSystem)lfs).getRaw();
-
     // Initialize the codec
     codec = initCodec();
 
-    boolean isLocal = true;
-    if (!job.get("mapred.job.tracker", "local").equals("local")) {
+    boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
+    if (!isLocal) {
       reduceCopier = new ReduceCopier(umbilical, job);
       if (!reduceCopier.fetchOutputs()) {
         throw new IOException(getTaskID() + "The reduce copier failed");
       }
-      isLocal = false;
     }
     copyPhase.complete();                         // copy is already complete
-    
-
-    // get the input files for the reducer to merge
-    Path[] mapFiles = getMapFiles(lfs, isLocal);
-    
-    Path tempDir = new Path(getTaskID().toString()); 
- 
-    setPhase(TaskStatus.Phase.SORT); 
+    setPhase(TaskStatus.Phase.SORT);
 
-    
-    // sort the input file
-    LOG.info("Initiating final on-disk merge with " + mapFiles.length + 
-             " files");
-    RawKeyValueIterator rIter = 
-      Merger.merge(job,rfs,
-                   job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
-                   codec, mapFiles, !conf.getKeepFailedTaskFiles(), 
-                   job.getInt("io.sort.factor", 100), tempDir, 
-                   job.getOutputKeyComparator(), reporter); 
+    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+    RawKeyValueIterator rIter = isLocal
+      ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
+          job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
+          !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
+          new Path(getTaskID().toString()), job.getOutputKeyComparator(),
+          reporter)
+      : reduceCopier.createKVIterator(job, rfs, reporter);
         
     // free up the data structures
     mapOutputFilesOnDisk.clear();
-    mapFiles = null;
     
     sortPhase.complete();                         // sort is complete
     setPhase(TaskStatus.Phase.REDUCE); 
@@ -528,10 +514,21 @@
     private volatile boolean exitLocalFSMerge = false;
     
     /**
-     * When we accumulate mergeThreshold number of files in ram, we merge/spill
+     * When we accumulate maxInMemOutputs number of files in ram, we merge/spill
      */
-    private int mergeThreshold = 500;
-    
+    private final int maxInMemOutputs;
+
+    /**
+     * Usage threshold for in-memory output accumulation.
+     */
+    private final float maxInMemCopyPer;
+
+    /**
+     * Maximum memory usage of map outputs to merge from memory into
+     * the reduce, in bytes.
+     */
+    private final long maxInMemReduce;
+
     /**
      * The threads for fetching the files.
      */
@@ -566,12 +563,7 @@
       Collections.synchronizedSet(new TreeSet<TaskAttemptID>());
     
     private Random random = null;
-    
-    /**
-     * the max size of the merge output from ramfs
-     */
-    private long ramfsMergeOutputSize;
-    
+
     /**
      * the max of all the map completion times
      */
@@ -849,8 +841,16 @@
       private int numClosed = 0;
       private boolean closed = false;
       
-      public ShuffleRamManager(Configuration conf) {
-        maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024;
+      public ShuffleRamManager(Configuration conf) throws IOException {
+        final float maxInMemCopyUse =
+          conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f);
+        if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+          throw new IOException("mapred.job.shuffle.input.buffer.percent" +
+                                maxInMemCopyUse);
+        }
+        maxSize = (int)Math.min(
+            Runtime.getRuntime().maxMemory() * maxInMemCopyUse,
+            Integer.MAX_VALUE);
         maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
         LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + 
                  ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
@@ -912,14 +912,11 @@
                  &&
                  // In-memory threshold exceeded and at least two segments
                  // have been fetched
-                 (getPercentUsed() < MAX_INMEM_FILESYS_USE ||
-                  numClosed < 
-                    (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)
-                 ) 
+                 (getPercentUsed() < maxInMemCopyPer || numClosed < 2)
                  &&
                  // More than "mapred.inmem.merge.threshold" map outputs
                  // have been fetched into memory
-                 (mergeThreshold <= 0 || numClosed < mergeThreshold) 
+                 (maxInMemOutputs <= 0 || numClosed < maxInMemOutputs)
                  && 
                  // More than MAX... threads are blocked on the RamManager
                  // or the blocked threads are the last map outputs to be
@@ -1545,13 +1542,21 @@
       // optimizing for the base 2
       this.maxFetchRetriesPerMap = getClosestPowerOf2((this.maxBackoff * 1000 
                                                        / BACKOFF_INIT) + 1); 
-      this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
+      this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
+      this.maxInMemCopyPer =
+        conf.getFloat("mapred.job.shuffle.merge.percent", 0.66f);
+      final float maxRedPer =
+        conf.getFloat("mapred.job.reduce.input.buffer.percent", 0f);
+      if (maxRedPer > 1.0 || maxRedPer < 0.0) {
+        throw new IOException("mapred.job.reduce.input.buffer.percent" +
+                              maxRedPer);
+      }
+      this.maxInMemReduce = (int)Math.min(
+          Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
 
       // Setup the RamManager
       ramManager = new ShuffleRamManager(conf);
-      ramfsMergeOutputSize = 
-        (long)(MAX_INMEM_FILESYS_USE * ramManager.getMemoryLimit());
-      
+
       localFileSys = FileSystem.getLocal(conf);
 
       rfs = ((LocalFileSystem)localFileSys).getRaw();
@@ -2002,13 +2007,21 @@
         return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
     }
     
-    private List<Segment<K, V>> createInMemorySegments() {
-      List<Segment<K, V>> inMemorySegments = 
-        new LinkedList<Segment<K, V>>();
+    private long createInMemorySegments(
+        List<Segment<K, V>> inMemorySegments, long leaveBytes)
+        throws IOException {
+      long totalSize = 0L;
       synchronized (mapOutputsFilesInMemory) {
-        while(mapOutputsFilesInMemory.size() > 0) {
+        // fullSize could come from the RamManager, but files can be
+        // closed but not yet present in mapOutputsFilesInMemory
+        long fullSize = 0L;
+        for (MapOutput mo : mapOutputsFilesInMemory) {
+          fullSize += mo.data.length;
+        }
+        while(fullSize > leaveBytes) {
           MapOutput mo = mapOutputsFilesInMemory.remove(0);
-          
+          totalSize += mo.data.length;
+          fullSize -= mo.data.length;
           Reader<K, V> reader = 
             new InMemoryReader<K, V>(ramManager, mo.mapAttemptId,
                                      mo.data, 0, mo.data.length);
@@ -2017,9 +2030,160 @@
           inMemorySegments.add(segment);
         }
       }
-      return inMemorySegments;
+      return totalSize;
     }
-    
+
+    /**
+     * Create a RawKeyValueIterator from copied map outputs. All copying
+     * threads have exited, so all of the map outputs are available either in
+     * memory or on disk. We also know that no merges are in progress, so
+     * synchronization is more lax, here.
+     *
+     * The iterator returned must satisfy the following constraints:
+     *   1. Fewer than io.sort.factor files may be sources
+     *   2. No more than maxInMemReduce bytes of map outputs may be resident
+     *      in memory when the reduce begins
+     *
+     * If we must perform an intermediate merge to satisfy (1), then we can
+     * keep the excluded outputs from (2) in memory and include them in the
+     * first merge pass. If not, then said outputs must be written to disk
+     * first.
+     */
+    @SuppressWarnings("unchecked")
+    private RawKeyValueIterator createKVIterator(
+        JobConf job, FileSystem fs, Reporter reporter) throws IOException {
+
+      // merge config params
+      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
+      Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
+      boolean keepInputs = job.getKeepFailedTaskFiles();
+      final Path tmpDir = new Path(getTaskID().toString());
+      final RawComparator<K> comparator =
+        (RawComparator<K>)job.getOutputKeyComparator();
+
+      // segments required to vacate memory
+      List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
+      long inMemToDiskBytes = 0;
+      if (mapOutputsFilesInMemory.size() > 0) {
+        TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
+        inMemToDiskBytes = createInMemorySegments(memDiskSegments,
+            maxInMemReduce);
+        final int numMemDiskSegments = memDiskSegments.size();
+        if (numMemDiskSegments > 0 &&
+              ioSortFactor > mapOutputFilesOnDisk.size()) {
+          // must spill to disk, but can't retain in-mem for intermediate merge
+          final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
+                            reduceTask.getTaskID(), inMemToDiskBytes);
+          final RawKeyValueIterator rIter = Merger.merge(job, fs,
+              keyClass, valueClass, memDiskSegments, numMemDiskSegments,
+              tmpDir, comparator, reporter);
+          final Writer writer = new Writer(job, fs, outputPath,
+              keyClass, valueClass, codec);
+          try {
+            Merger.writeFile(rIter, writer, reporter);
+            addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
+          } catch (Exception e) {
+            if (null != outputPath) {
+              fs.delete(outputPath, true);
+            }
+            throw new IOException("Final merge failed", e);
+          } finally {
+            if (null != writer) {
+              writer.close();
+            }
+          }
+          LOG.info("Merged " + numMemDiskSegments + " segments, " +
+                   inMemToDiskBytes + " bytes to disk to satisfy " +
+                   "reduce memory limit");
+          inMemToDiskBytes = 0;
+          memDiskSegments.clear();
+        } else if (inMemToDiskBytes != 0) {
+          LOG.info("Keeping " + numMemDiskSegments + " segments, " +
+                   inMemToDiskBytes + " bytes in memory for " +
+                   "intermediate, on-disk merge");
+        }
+      }
+
+      // segments on disk
+      List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
+      long onDiskBytes = inMemToDiskBytes;
+      Path[] onDisk = getMapFiles(fs, false);
+      for (Path file : onDisk) {
+        onDiskBytes += fs.getFileStatus(file).getLen();
+        diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs));
+      }
+      LOG.info("Merging " + onDisk.length + " files, " +
+               onDiskBytes + " bytes from disk");
+      Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
+        public int compare(Segment<K, V> o1, Segment<K, V> o2) {
+          if (o1.getLength() == o2.getLength()) {
+            return 0;
+          }
+          return o1.getLength() < o2.getLength() ? -1 : 1;
+        }
+      });
+
+      // build final list of segments from merged backed by disk + in-mem
+      List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
+      long inMemBytes = createInMemorySegments(finalSegments, 0);
+      LOG.info("Merging " + finalSegments.size() + " segments, " +
+               inMemBytes + " bytes from memory into reduce");
+      if (0 != onDiskBytes) {
+        final int numInMemSegments = memDiskSegments.size();
+        diskSegments.addAll(0, memDiskSegments);
+        memDiskSegments.clear();
+        RawKeyValueIterator diskMerge = Merger.merge(
+            job, fs, keyClass, valueClass, diskSegments,
+            ioSortFactor, numInMemSegments, tmpDir, comparator,
+            reporter, false);
+        diskSegments.clear();
+        if (0 == finalSegments.size()) {
+          return diskMerge;
+        }
+        finalSegments.add(new Segment<K,V>(
+              new RawKVIteratorReader(diskMerge, onDiskBytes), true));
+      }
+      return Merger.merge(job, fs, keyClass, valueClass,
+                   finalSegments, finalSegments.size(), tmpDir,
+                   comparator, reporter);
+    }
+
+    class RawKVIteratorReader extends IFile.Reader<K,V> {
+
+      private final RawKeyValueIterator kvIter;
+
+      public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
+          throws IOException {
+        super(null, null, size, null);
+        this.kvIter = kvIter;
+      }
+
+      public boolean next(DataInputBuffer key, DataInputBuffer value)
+          throws IOException {
+        if (kvIter.next()) {
+          final DataInputBuffer kb = kvIter.getKey();
+          final DataInputBuffer vb = kvIter.getValue();
+          final int kp = kb.getPosition();
+          final int klen = kb.getLength() - kp;
+          key.reset(kb.getData(), kp, klen);
+          final int vp = vb.getPosition();
+          final int vlen = vb.getLength() - vp;
+          value.reset(vb.getData(), vp, vlen);
+          bytesRead += klen + vlen;
+          return true;
+        }
+        return false;
+      }
+
+      public long getPosition() throws IOException {
+        return bytesRead;
+      }
+
+      public void close() throws IOException {
+        kvIter.close();
+      }
+    }
+
     private CopyResult getCopyResult(int numInFlight) {  
       synchronized (copyResults) {
         while (copyResults.isEmpty()) {
@@ -2258,7 +2422,9 @@
           boolean exit = false;
           do {
             exit = ramManager.waitForDataToMerge();
-            doInMemMerge();
+            if (!exit) {
+              doInMemMerge();
+            }
           } while (!exit);
         } catch (Throwable t) {
           LOG.warn(reduceTask.getTaskID() +
@@ -2284,19 +2450,20 @@
 
         //figure out the mapId 
         TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
-        
+
+        List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K,V>>();
+        long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
+        int noInMemorySegments = inMemorySegments.size();
+
         Path outputPath = mapOutputFile.getInputFileForWrite(mapId, 
-                          reduceTask.getTaskID(), ramfsMergeOutputSize);
+                          reduceTask.getTaskID(), mergeOutputSize);
 
         Writer writer = 
           new Writer(conf, rfs, outputPath,
                      conf.getMapOutputKeyClass(),
                      conf.getMapOutputValueClass(),
                      codec);
-        
-        List<Segment<K, V>> inMemorySegments = createInMemorySegments();
-        int noInMemorySegments = inMemorySegments.size();
-        
+
         RawKeyValueIterator rIter = null;
         final Reporter reporter = getReporter(umbilical);
         try {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapCollection.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapCollection.java?rev=694459&r1=694458&r2=694459&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapCollection.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapCollection.java Thu Sep 11 13:26:11 2008
@@ -213,8 +213,12 @@
 
     public FakeIF() { }
 
-    public InputSplit[] getSplits(JobConf conf, int splits) {
-      return new InputSplit[] { new FakeSplit() };
+    public InputSplit[] getSplits(JobConf conf, int numSplits) {
+      InputSplit[] splits = new InputSplit[numSplits];
+      for (int i = 0; i < splits.length; ++i) {
+        splits[i] = new FakeSplit();
+      }
+      return splits;
     }
 
     public RecordReader<NullWritable,NullWritable> getRecordReader(

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java?rev=694459&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java Thu Sep 11 13:26:11 2008
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+import junit.extensions.TestSetup;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.TestMapCollection.FakeIF;
+import org.apache.hadoop.mapred.TestMapCollection.FakeSplit;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import static org.apache.hadoop.mapred.Task.FileSystemCounter.HDFS_WRITE;
+import static org.apache.hadoop.mapred.Task.FileSystemCounter.LOCAL_READ;
+
+public class TestReduceFetch extends TestCase {
+
+  private static MiniMRCluster mrCluster = null;
+  private static MiniDFSCluster dfsCluster = null;
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestReduceFetch.class)) {
+      protected void setUp() throws Exception {
+        Configuration conf = new Configuration();
+        dfsCluster = new MiniDFSCluster(conf, 2, true, null);
+        mrCluster = new MiniMRCluster(2,
+            dfsCluster.getFileSystem().getUri().toString(), 1);
+      }
+      protected void tearDown() throws Exception {
+        if (dfsCluster != null) { dfsCluster.shutdown(); }
+        if (mrCluster != null) { mrCluster.shutdown(); }
+      }
+    };
+    return setup;
+  }
+
+  public static class MapMB
+      implements Mapper<NullWritable,NullWritable,Text,Text> {
+
+    public void map(NullWritable nk, NullWritable nv,
+        OutputCollector<Text, Text> output, Reporter reporter)
+        throws IOException {
+      Text key = new Text();
+      Text val = new Text();
+      key.set("KEYKEYKEYKEYKEYKEYKEYKEY");
+      byte[] b = new byte[1024];
+      Arrays.fill(b, (byte)'V');
+      val.set(b);
+      b = null;
+      for (int i = 0; i < 1024; ++i) {
+        output.collect(key, val);
+      }
+    }
+    public void configure(JobConf conf) { }
+    public void close() throws IOException { }
+  }
+
+  public static Counters runJob(JobConf conf) throws Exception {
+    conf.setMapperClass(MapMB.class);
+    conf.setReducerClass(IdentityReducer.class);
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class);
+    conf.setNumMapTasks(3);
+    conf.setNumReduceTasks(1);
+    conf.setInputFormat(FakeIF.class);
+    FileInputFormat.setInputPaths(conf, new Path("/in"));
+    final Path outp = new Path("/out");
+    FileOutputFormat.setOutputPath(conf, outp);
+    SkipBadRecords.setEnabled(conf, false);
+    RunningJob job = null;
+    try {
+      job = JobClient.runJob(conf);
+      assertTrue(job.isSuccessful());
+    } finally {
+      FileSystem fs = dfsCluster.getFileSystem();
+      if (fs.exists(outp)) {
+        fs.delete(outp, true);
+      }
+    }
+    return job.getCounters();
+  }
+
+  public void testReduceFromDisk() throws Exception {
+    JobConf job = mrCluster.createJobConf();
+    job.set("mapred.job.reduce.input.buffer.percent", "0.0");
+    Counters c = runJob(job);
+    assertTrue(c.findCounter(HDFS_WRITE).getCounter() <=
+               c.findCounter(LOCAL_READ).getCounter());
+  }
+
+  public void testReduceFromPartialMem() throws Exception {
+    JobConf job = mrCluster.createJobConf();
+    job.setInt("mapred.inmem.merge.threshold", 2);
+    job.set("mapred.job.reduce.input.buffer.percent", "1.0");
+    Counters c = runJob(job);
+    assertTrue(c.findCounter(HDFS_WRITE).getCounter() >=
+               c.findCounter(LOCAL_READ).getCounter() + 1024 * 1024);
+  }
+
+  public void testReduceFromMem() throws Exception {
+    JobConf job = mrCluster.createJobConf();
+    job.set("mapred.job.reduce.input.buffer.percent", "1.0");
+    Counters c = runJob(job);
+    assertTrue(c.findCounter(LOCAL_READ).getCounter() == 0);
+  }
+
+}