You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/08 02:53:09 UTC

svn commit: r483772 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/mapred/

Author: cutting
Date: Thu Dec  7 17:53:07 2006
New Revision: 483772

URL: http://svn.apache.org/viewvc?view=rev&rev=483772
Log:
HADOOP-331.  Write all map outputs to a single file with an index, rather than to a separate file per reduce task.  Contributed by Devaraj.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/MergeSort.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/build.xml
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Dec  7 17:53:07 2006
@@ -40,6 +40,11 @@
     jobs that were running when it was last stopped.
     (Sanjay Dahiya via cutting)
 
+12. HADOOP-331. Write all map outputs to a single file with an index,
+    rather than to a separate file per reduce task.  This should both
+    speed the shuffle and make things more scalable.
+    (Devaraj Das via cutting)
+
 
 Release 0.9.1 - 2006-12-06
 

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Thu Dec  7 17:53:07 2006
@@ -366,7 +366,7 @@
     <delete dir="${test.log.dir}"/>
     <mkdir dir="${test.log.dir}"/>
     <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" 
-           fork="yes" dir="${basedir}"
+           fork="yes" maxmemory="128m" dir="${basedir}"
       errorProperty="tests.failed" failureProperty="tests.failed">
       <sysproperty key="test.build.data" value="${test.build.data}"/>
       <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Thu Dec  7 17:53:07 2006
@@ -433,13 +433,6 @@
 </property>
 
 <property>
-  <name>mapred.combine.buffer.size</name>
-  <value>100000</value>
-  <description>The number of entries the combining collector caches before
-  combining them and writing to disk.</description>
-</property>
-
-<property>
   <name>mapred.speculative.execution</name>
   <value>true</value>
   <description>If true, then multiple instances of some map tasks may
@@ -572,6 +565,13 @@
   <name>io.seqfile.compression.type</name>
   <value>RECORD</value>
   <description>The default compression type for SequenceFile.Writer.
+  </description>
+</property>
+
+<property>
+  <name>map.sort.class</name>
+  <value>org.apache.hadoop.mapred.MergeSorter</value>
+  <description>The default sort class for sorting keys.
   </description>
 </property>
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Thu Dec  7 17:53:07 2006
@@ -37,6 +37,7 @@
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.MergeSort;
 
 /** Support for flat files of binary key/value pairs. */
 public class SequenceFile {
@@ -284,6 +285,42 @@
     return writer;
   }
 
+  /**
+   * Construct the preferred type of 'raw' SequenceFile Writer.
+   * @param conf The configuration.
+   * @param out The stream on top which the writer is to be constructed.
+   * @param keyClass The 'key' type.
+   * @param valClass The 'value' type.
+   * @param compressionType The compression type.
+   * @param codec The compression codec.
+   * @return Returns the handle to the constructed SequenceFile Writer.
+   * @throws IOException
+   */
+  public static Writer
+  createWriter(Configuration conf, FSDataOutputStream out, 
+      Class keyClass, Class valClass, CompressionType compressionType,
+      CompressionCodec codec)
+  throws IOException {
+    if ((codec instanceof GzipCodec) && 
+        !NativeCodeLoader.isNativeCodeLoaded() && 
+        !ZlibFactory.isNativeZlibLoaded()) {
+      throw new IllegalArgumentException("SequenceFile doesn't work with " +
+          "GzipCodec without native-hadoop code!");
+    }
+
+    Writer writer = null;
+
+    if (compressionType == CompressionType.NONE) {
+      writer = new Writer(conf, out, keyClass, valClass);
+    } else if (compressionType == CompressionType.RECORD) {
+      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec);
+    } else if (compressionType == CompressionType.BLOCK){
+      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec);
+    }
+    
+    return writer;
+  }
+
   /** The interface to 'raw' values of SequenceFiles. */
   public static interface ValueBytes {
 
@@ -505,6 +542,10 @@
 
     /** Returns the compression codec of data in this file. */
     public CompressionCodec getCompressionCodec() { return codec; }
+    
+    /** create a sync point */
+    public void sync() throws IOException {
+    }
 
     /** Returns the configuration of this file. */
     Configuration getConf() { return conf; }
@@ -686,6 +727,10 @@
       val.writeCompressedBytes(out);              // 'value' data
     }
     
+
+    public void sync() throws IOException {
+    }
+   
   } // RecordCompressionWriter
 
   /** Write compressed key/value blocks to a sequence-format file. */
@@ -804,6 +849,10 @@
       }
     }
 
+    public void sync() throws IOException {
+      writeBlock();
+    }
+
     /** Append a key/value pair. */
     public synchronized void append(Writable key, Writable val)
       throws IOException {
@@ -1508,6 +1557,8 @@
 
     private WritableComparator comparator;
 
+    private MergeSort mergeSort; //the implementation of merge sort
+    
     private Path[] inFiles;                     // when merging or sorting
 
     private Path outFile;
@@ -1612,6 +1663,7 @@
     private int sortPass(boolean deleteInput) throws IOException {
       LOG.debug("running sort pass");
       SortPass sortPass = new SortPass();         // make the SortPass
+      mergeSort = new MergeSort(sortPass.new SeqFileComparator());
       try {
         return sortPass.run(deleteInput);         // run it
       } finally {
@@ -1775,11 +1827,7 @@
           int p = pointers[i];
           writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
         }
-        if (writer instanceof SequenceFile.BlockCompressWriter) {
-          SequenceFile.BlockCompressWriter bcWriter = 
-            (SequenceFile.BlockCompressWriter) writer;
-          bcWriter.writeBlock();
-        }
+        writer.sync();
         writer.out.flush();
         
         
@@ -1793,50 +1841,14 @@
 
       private void sort(int count) {
         System.arraycopy(pointers, 0, pointersCopy, 0, count);
-        mergeSort(pointersCopy, pointers, 0, count);
+        mergeSort.mergeSort(pointersCopy, pointers, 0, count);
       }
-
-      private int compare(int i, int j) {
-        return comparator.compare(rawBuffer, keyOffsets[i], keyLengths[i],
-                                  rawBuffer, keyOffsets[j], keyLengths[j]);
-      }
-
-      private void mergeSort(int src[], int dest[], int low, int high) {
-        int length = high - low;
-
-        // Insertion sort on smallest arrays
-        if (length < 7) {
-          for (int i=low; i<high; i++)
-            for (int j=i; j>low && compare(dest[j-1], dest[j])>0; j--)
-              swap(dest, j, j-1);
-          return;
-        }
-
-        // Recursively sort halves of dest into src
-        int mid = (low + high) >> 1;
-        mergeSort(dest, src, low, mid);
-        mergeSort(dest, src, mid, high);
-
-        // If list is already sorted, just copy from src to dest.  This is an
-        // optimization that results in faster sorts for nearly ordered lists.
-        if (compare(src[mid-1], src[mid]) <= 0) {
-          System.arraycopy(src, low, dest, low, length);
-          return;
+      class SeqFileComparator implements Comparator<IntWritable> {
+        public int compare(IntWritable I, IntWritable J) {
+          return comparator.compare(rawBuffer, keyOffsets[I.get()], 
+                                    keyLengths[I.get()], rawBuffer, 
+                                    keyOffsets[J.get()], keyLengths[J.get()]);
         }
-
-        // Merge sorted halves (now in src) into dest
-        for (int i = low, p = low, q = mid; i < high; i++) {
-          if (q>=high || p<mid && compare(src[p], src[q]) <= 0)
-            dest[i] = src[p++];
-          else
-            dest[i] = src[q++];
-        }
-      }
-
-      private void swap(int x[], int a, int b) {
-        int t = x[a];
-        x[a] = x[b];
-        x[b] = t;
       }
     } // SequenceFile.Sorter.SortPass
 
@@ -1898,7 +1910,36 @@
         s.doSync();
         a.add(s);
       }
-      factor = inNames.length;
+      factor = (inNames.length < factor) ? inNames.length : factor;
+      MergeQueue mQueue = new MergeQueue(a);
+      return mQueue.merge();
+    }
+
+    /**
+     * Merges the contents of files passed in Path[]
+     * @param inNames the array of path names
+     * @param tempDir the directory for creating temp files during merge
+     * @param deleteInputs true if the input files should be deleted when 
+     * unnecessary
+     * @return RawKeyValueIteratorMergeQueue
+     * @throws IOException
+     */
+    public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
+                                     boolean deleteInputs) 
+    throws IOException {
+      //outFile will basically be used as prefix for temp files for the
+      //intermediate merge outputs           
+      this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
+      //get the segments from inNames
+      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
+      for (int i = 0; i < inNames.length; i++) {
+        SegmentDescriptor s = new SegmentDescriptor(0, 
+                              fs.getLength(inNames[i]), inNames[i]);
+        s.preserveInput(!deleteInputs);
+        s.doSync();
+        a.add(s);
+      }
+      factor = (inNames.length < factor) ? inNames.length : factor;
       MergeQueue mQueue = new MergeQueue(a);
       return mQueue.merge();
     }
@@ -1916,7 +1957,7 @@
      */
     public Writer cloneFileAttributes(FileSystem fileSys, Path inputFile, 
                   Path outputFile, Progressable prog) throws IOException {
-      Reader reader = new Reader(fileSys, inputFile, memory/(factor+1), conf);
+      Reader reader = new Reader(fileSys, inputFile, 4096, conf);
       boolean compress = reader.isCompressed();
       boolean blockCompress = reader.isBlockCompressed();
       CompressionCodec codec = reader.getCompressionCodec();
@@ -1944,11 +1985,7 @@
         writer.appendRaw(records.getKey().getData(), 0, 
                          records.getKey().getLength(), records.getValue());
       }
-      if (writer instanceof SequenceFile.BlockCompressWriter) {
-        SequenceFile.BlockCompressWriter bcWriter =
-                        (SequenceFile.BlockCompressWriter) writer;
-        bcWriter.writeBlock();
-      }
+      writer.sync();
     }
         
     /** Merge the provided files.

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java?view=auto&rev=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java Thu Dec  7 17:53:07 2006
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile.ValueBytes;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
+
+
+/** This class implements the sort interface using primitive int arrays as 
+ * the data structures (that is why this class is called 'BasicType'SorterBase)
+ * @author ddas
+ */
+abstract class BasicTypeSorterBase implements BufferSorter {
+  
+  protected DataOutputBuffer keyValBuffer; //the buffer used for storing
+                                           //key/values
+  protected int[] startOffsets; //the array used to store the start offsets of
+                                //keys in keyValBuffer
+  protected int[] keyLengths; //the array used to store the lengths of
+                              //keys
+  protected int[] valueLengths; //the array used to store the value lengths 
+  protected int[] pointers; //the array of startOffsets's indices. This will
+                            //be sorted at the end to contain a sorted array of
+                            //indices to offsets
+  protected WritableComparator comparator; //the comparator for the map output
+  protected int count; //the number of key/values
+  //the overhead of the arrays in memory 
+  //12 => 4 for keyoffsets, 4 for keylengths, 4 for valueLengths, and
+  //4 for indices into startOffsets array in the
+  //pointers array (ignored the partpointers list itself)
+  private final int BUFFERED_KEY_VAL_OVERHEAD = 16;
+
+  //Implementation of methods of the SorterBase interface
+  //
+  public void configure(JobConf conf) {
+    startOffsets = new int[1024];
+    keyLengths = new int[1024];
+    valueLengths = new int[1024];
+    pointers = new int[1024];
+    comparator = conf.getOutputKeyComparator();
+  }
+  
+  public void addKeyValue(int recordOffset, int keyLength, int valLength) {
+    //Add the start offset of the key in the startOffsets array and the
+    //length in the keyLengths array.
+    if (count == startOffsets.length)
+      grow();
+    startOffsets[count] = recordOffset;
+    keyLengths[count] = keyLength;
+    valueLengths[count] = valLength;
+    pointers[count] = count;
+    count++;
+  }
+
+  public void setInputBuffer(DataOutputBuffer buffer) {
+    //store a reference to the keyValBuffer that we need to read during sort
+    this.keyValBuffer = buffer;
+  }
+
+  public long getMemoryUtilized() {
+    return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD;
+  }
+
+  public abstract RawKeyValueIterator sort();
+  
+  public void close() {
+    //just set count to 0; we reuse the arrays
+    count = 0;
+  }
+  //A compare method that references the keyValBuffer through the indirect
+  //pointers
+  protected int compare(int i, int j) {
+    return comparator.compare(keyValBuffer.getData(), startOffsets[i],
+                              keyLengths[i],
+                              keyValBuffer.getData(), startOffsets[j], 
+                              keyLengths[j]);
+  }
+  
+  private void grow() {
+    int newLength = startOffsets.length * 3/2;
+    startOffsets = grow(startOffsets, newLength);
+    keyLengths = grow(keyLengths, newLength);
+    valueLengths = grow(valueLengths, newLength);
+    pointers = grow(pointers, newLength);
+  }
+  
+  private int[] grow(int[] old, int newLength) {
+    int[] result = new int[newLength];
+    System.arraycopy(old, 0, result, 0, old.length);
+    return result;
+  }
+} //BasicTypeSorterBase
+
+//Implementation of methods of the RawKeyValueIterator interface. These
+//methods must be invoked to iterate over key/vals after sort is done.
+//
+class MRSortResultIterator implements RawKeyValueIterator {
+  
+  private int count;
+  private int[] pointers;
+  private int[] startOffsets;
+  private int[] keyLengths;
+  private int[] valLengths;
+  private int currStartOffsetIndex;
+  private int currIndexInPointers;
+  private DataOutputBuffer keyValBuffer;
+  private DataOutputBuffer key = new DataOutputBuffer();
+  private InMemUncompressedBytes value = new InMemUncompressedBytes();
+  
+  public MRSortResultIterator(DataOutputBuffer keyValBuffer, 
+                              int []pointers, int []startOffsets,
+                              int []keyLengths, int []valLengths) {
+    this.count = pointers.length;
+    this.pointers = pointers;
+    this.startOffsets = startOffsets;
+    this.keyLengths = keyLengths;
+    this.valLengths = valLengths;
+    this.keyValBuffer = keyValBuffer;
+  }
+  
+  public Progress getProgress() {
+    return null;
+  }
+  
+  public DataOutputBuffer getKey() throws IOException {
+    int currKeyOffset = startOffsets[currStartOffsetIndex];
+    int currKeyLength = keyLengths[currStartOffsetIndex];
+    //reuse the same key
+    key.reset();
+    key.write(keyValBuffer.getData(), currKeyOffset, currKeyLength);
+    return key;
+  }
+
+  public ValueBytes getValue() throws IOException {
+    //value[i] is stored in the following byte range:
+    //startOffsets[i] + keyLengths[i] through valLengths[i]
+    value.reset(keyValBuffer,
+       startOffsets[currStartOffsetIndex] + keyLengths[currStartOffsetIndex],
+       valLengths[currStartOffsetIndex]);
+    return value;
+  }
+
+  public boolean next() throws IOException {
+    if (count == currIndexInPointers)
+      return false;
+    currStartOffsetIndex = pointers[currIndexInPointers];
+    currIndexInPointers++;
+    return true;
+  }
+  
+  public void close() {
+    return;
+  }
+  
+  //An implementation of the ValueBytes interface for the in-memory value
+  //buffers. 
+  private class InMemUncompressedBytes implements ValueBytes {
+    private byte[] data;
+    int start;
+    int dataSize;
+    private void reset(DataOutputBuffer d, int start, int length) 
+    throws IOException {
+      data = d.getData();
+      this.start = start;
+      dataSize = length;
+    }
+            
+    public int getSize() throws IOException {
+      return dataSize;
+    }
+            
+    public void writeUncompressedBytes(DataOutputStream outStream)
+    throws IOException {
+      outStream.write(data, start, dataSize);
+    }
+
+    public void writeCompressedBytes(DataOutputStream outStream) 
+    throws IllegalArgumentException, IOException {
+      throw
+      new IllegalArgumentException("UncompressedBytes cannot be compressed!");
+    }
+  
+  } // InMemUncompressedBytes
+
+} //MRSortResultIterator

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java?view=auto&rev=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java Thu Dec  7 17:53:07 2006
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
+
+/** This class provides a generic sort interface that should be implemented
+ * by specific sort algorithms. The use case is the following:
+ * A user class writes key/value records to a buffer, and finally wants to
+ * sort the buffer. This interface defines methods by which the user class
+ * can update the interface implementation with the offsets of the records
+ * and the lengths of the keys/values. The user class gives a reference to
+ * the buffer when the latter wishes to sort the records written to the buffer
+ * so far. Typically, the user class decides the point at which sort should
+ * happen based on the memory consumed so far by the buffer and the data
+ * structures maintained by an implementation of this interface. That is why
+ * a method is provided to get the memory consumed so far by the datastructures
+ * in the interface implementation.  
+ * @author ddas
+ *
+ */
+interface BufferSorter extends JobConfigurable {
+  
+  /** When a key/value is added at a particular offset in the key/value buffer, 
+   * this method is invoked by the user class so that the impl of this sort 
+   * interface can update its datastructures. 
+   * @param recordOffset the offset of the key in the buffer
+   * @param keyLength the length of the key
+   * @param valLength the length of the val in the buffer
+   */
+  public void addKeyValue(int recordoffset, int keyLength, int valLength);
+  
+  /** The user class invokes this method to set the buffer that the specific 
+   * sort algorithm should "indirectly" sort (generally, sort algorithm impl 
+   * should access this buffer via comparators and sort offset-indices to the
+   * buffer).
+   * @param buffer the map output buffer
+   */
+  public void setInputBuffer(DataOutputBuffer buffer);
+  
+  /** The framework invokes this method to get the memory consumed so far
+   * by an implementation of this interface.
+   * @return memoryUsed in bytes 
+   */
+  public long getMemoryUtilized();
+  
+  /** Framework decides when to actually sort
+   */
+  public RawKeyValueIterator sort();
+  
+  /** Framework invokes this to signal the sorter to cleanup
+   */
+  public void close();
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java Thu Dec  7 17:53:07 2006
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/** Implements partial value reduction during mapping.  This can minimize the
- * size of intermediate data.  Buffers a list of values for each unique key,
- * then invokes the combiner's reduce method to merge some values before
- * they're transferred to a reduce node. */
-class CombiningCollector implements OutputCollector {
-  private int limit;
-
-  private int count = 0;
-  private Map keyToValues;                        // the buffer
-
-  private JobConf job;
-  private OutputCollector out;
-  private Reducer combiner;
-  private Reporter reporter;
-
-  public CombiningCollector(JobConf job, OutputCollector out,
-                            Reporter reporter) {
-    this.job = job;
-    this.out = out;
-    this.reporter = reporter;
-    this.combiner = (Reducer)ReflectionUtils.newInstance(job.getCombinerClass(),
-                                                         job);
-    this.keyToValues = new TreeMap(job.getOutputKeyComparator());
-    this.limit = job.getInt("mapred.combine.buffer.size", 100000);
-  }
-
-  public synchronized void collect(WritableComparable key, Writable value)
-    throws IOException {
-
-    // buffer new value in map
-    ArrayList values = (ArrayList)keyToValues.get(key);
-    Writable valueClone = WritableUtils.clone(value, job);
-    if (values == null) {
-      // this is a new key, so create a new list
-      values = new ArrayList(1);
-      values.add(valueClone);
-      Writable keyClone = WritableUtils.clone(key, job);
-      keyToValues.put(keyClone, values);
-    } else {
-      // other values for this key, so just add.
-      values.add(valueClone);
-    }
-
-    count++;
-
-    if (count >= this.limit) {                         // time to flush
-      flush();
-    }
-  }
-
-  public synchronized void flush() throws IOException {
-    Iterator pairs = keyToValues.entrySet().iterator();
-    while (pairs.hasNext()) {
-      Map.Entry pair = (Map.Entry)pairs.next();
-      combiner.reduce((WritableComparable)pair.getKey(),
-                      ((ArrayList)pair.getValue()).iterator(),
-                      out, reporter);
-    }
-    keyToValues.clear();
-    count = 0;
-  }
-  
-  public synchronized void close() throws IOException { 
-    combiner.close(); 
-  }
-
-}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Dec  7 17:53:07 2006
@@ -115,7 +115,7 @@
         String reduceId = "reduce_" + newId();
         for (int i = 0; i < mapIds.size(); i++) {
           String mapId = (String)mapIds.get(i);
-          Path mapOut = this.mapoutputFile.getOutputFile(mapId, 0);
+          Path mapOut = this.mapoutputFile.getOutputFile(mapId);
           Path reduceIn = this.mapoutputFile.getInputFile(i, reduceId);
           if (!localFs.mkdirs(reduceIn.getParent())) {
             throw new IOException("Mkdirs failed to create " + 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Thu Dec  7 17:53:07 2006
@@ -32,11 +32,36 @@
   
   /** Create a local map output file name.
    * @param mapTaskId a map task id
-   * @param partition a reduce partition
    */
-  public Path getOutputFile(String mapTaskId, int partition)
+  public Path getOutputFile(String mapTaskId)
     throws IOException {
-    return conf.getLocalPath(mapTaskId+"/part-"+partition+".out");
+    return conf.getLocalPath(mapTaskId+"/file.out");
+  }
+
+  /** Create a local map output index file name.
+   * @param mapTaskId a map task id
+   */
+  public Path getOutputIndexFile(String mapTaskId)
+    throws IOException {
+    return conf.getLocalPath(mapTaskId+"/file.out.index");
+  }
+
+  /** Create a local map spill file name.
+   * @param mapTaskId a map task id
+   * @param spillNumber the number
+   */
+  public Path getSpillFile(String mapTaskId, int spillNumber)
+    throws IOException {
+    return conf.getLocalPath(mapTaskId+"/spill" +spillNumber+".out");
+  }
+
+  /** Create a local map spill index file name.
+   * @param mapTaskId a map task id
+   * @param spillNumber the number
+   */
+  public Path getSpillIndexFile(String mapTaskId, int spillNumber)
+    throws IOException {
+    return conf.getLocalPath(mapTaskId+"/spill" +spillNumber+".out.index");
   }
 
   /** Create a local reduce input file name.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Dec  7 17:53:07 2006
@@ -19,11 +19,14 @@
 package org.apache.hadoop.mapred;
 
 import java.io.*;
+import java.util.*;
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.SequenceFile.Sorter;
+import org.apache.hadoop.io.SequenceFile.Sorter.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,6 +35,9 @@
 import org.apache.commons.logging.*;
 import org.apache.hadoop.metrics.Metrics;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
+
 
 /** A Map task. */
 class MapTask extends Task {
@@ -127,14 +133,117 @@
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
 
-    // open output files
-    final int partitions = job.getNumReduceTasks();
-    final SequenceFile.Writer[] outs = new SequenceFile.Writer[partitions];
+    Reporter reporter = getReporter(umbilical, getProgress());
+
+    MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
+      
+    final RecordReader rawIn =                  // open input
+      job.getInputFormat().getRecordReader
+      (FileSystem.get(job), split, job, reporter);
+
+    RecordReader in = new RecordReader() {      // wrap in progress reporter
+        private float perByte = 1.0f /(float)split.getLength();
+
+        public WritableComparable createKey() {
+          return rawIn.createKey();
+        }
+          
+        public Writable createValue() {
+          return rawIn.createValue();
+        }
+         
+        public synchronized boolean next(Writable key, Writable value)
+          throws IOException {
+
+          float progress =                        // compute progress
+            (float)Math.min((rawIn.getPos()-split.getStart())*perByte, 1.0f);
+          reportProgress(umbilical, progress);
+          long beforePos = getPos();
+          boolean ret = rawIn.next(key, value);
+          myMetrics.mapInput(getPos() - beforePos);
+          return ret;
+        }
+        public long getPos() throws IOException { return rawIn.getPos(); }
+        public void close() throws IOException { rawIn.close(); }
+      };
+
+    MapRunnable runner =
+      (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
+
     try {
-      Reporter reporter = getReporter(umbilical, getProgress());
-      FileSystem localFs = FileSystem.getNamed("local", job);
-      CompressionCodec codec = null;
-      CompressionType compressionType = CompressionType.NONE;
+      runner.run(in, collector, reporter);      // run the map
+    } finally {
+      in.close();                               // close input
+      //check whether the length of the key/value buffer is 0. If not, then
+      //we need to spill that to disk. Note that we reset the key/val buffer
+      //upon each spill (so a length > 0 means that we have not spilled yet)
+      if (((MapOutputBuffer)collector).keyValBuffer.getLength() > 0) {
+        ((MapOutputBuffer)collector).sortAndSpillToDisk();
+      }
+      //merge the partitions from the spilled files and create one output
+      collector.mergeParts();
+      //close
+      collector.close();
+    }
+    done(umbilical);
+  }
+
+  public void setConf(Configuration conf) {
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+    } else {
+      this.conf = new JobConf(conf);
+    }
+    this.mapOutputFile.setConf(this.conf);
+  }
+
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  class MapOutputBuffer implements OutputCollector {
+
+    private final int partitions;
+    private Partitioner partitioner;
+    private TaskUmbilicalProtocol umbilical;
+    private JobConf job;
+    private Reporter reporter;
+
+    private DataOutputBuffer keyValBuffer; //the buffer where key/val will
+                                           //be stored before they are 
+                                           //spilled to disk
+    private int maxBufferSize; //the max amount of in-memory space after which
+                               //we will spill the keyValBuffer to disk
+    private int numSpills; //maintains the no. of spills to disk done so far
+    
+    private FileSystem localFs;
+    private CompressionCodec codec;
+    private CompressionType compressionType;
+    private Class keyClass;
+    private Class valClass;
+    private WritableComparator comparator;
+    private BufferSorter []sortImpl;
+    private SequenceFile.Writer writer;
+    private FSDataOutputStream out;
+    private FSDataOutputStream indexOut;
+    private long segmentStart;
+    public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, 
+            Reporter reporter) throws IOException {
+      this.partitions = job.getNumReduceTasks();
+      this.partitioner = (Partitioner)ReflectionUtils.newInstance(
+                                      job.getPartitionerClass(), job);
+      maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024;
+      keyValBuffer = new DataOutputBuffer();
+
+      this.umbilical = umbilical;
+      this.job = job;
+      this.reporter = reporter;
+      this.comparator = job.getOutputKeyComparator();
+      this.keyClass = job.getMapOutputKeyClass();
+      this.valClass = job.getMapOutputValueClass();
+      this.localFs = FileSystem.getNamed("local", job);
+      this.codec = null;
+      this.compressionType = CompressionType.NONE;
       if (job.getCompressMapOutput()) {
         // find the kind of compression to do, defaulting to record
         compressionType = job.getMapOutputCompressionType();
@@ -145,106 +254,249 @@
         codec = (CompressionCodec) 
                    ReflectionUtils.newInstance(codecClass, job);
       }
-      for (int i = 0; i < partitions; i++) {
-        Path filename = mapOutputFile.getOutputFile(getTaskId(), i);
-        outs[i] =
-          SequenceFile.createWriter(localFs, job, filename,
-                                    job.getMapOutputKeyClass(),
-                                    job.getMapOutputValueClass(),
-                                    compressionType, codec, reporter);
-        LOG.info("opened "+this.mapOutputFile.getOutputFile(getTaskId(), i).getName());
-      }
-
-      final Partitioner partitioner =
-        (Partitioner)ReflectionUtils.newInstance(job.getPartitionerClass(), job);
-
-      OutputCollector partCollector = new OutputCollector() { // make collector
-          public synchronized void collect(WritableComparable key,
-                                           Writable value)
-            throws IOException {
-            SequenceFile.Writer out = outs[partitioner.getPartition(key, value, partitions)];
-            long beforePos = out.getLength();
-            out.append(key, value);
-            reportProgress(umbilical);
-            myMetrics.mapOutput(out.getLength() - beforePos);
-          }
-        };
-
-      OutputCollector collector = partCollector;
-
-      boolean combining = job.getCombinerClass() != null;
-      if (combining) {                            // add combining collector
-        collector = new CombiningCollector(job, partCollector, reporter);
+      sortImpl = new BufferSorter[partitions];
+      for (int i = 0; i < partitions; i++)
+        sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
+                   job.getClass("map.sort.class", MergeSorter.class,
+                   BufferSorter.class), job);
+    }
+    public void startPartition(int partNumber) throws IOException {
+      //We create the sort output as multiple sequence files within a spilled
+      //file. So we create a writer for each partition. 
+      segmentStart = out.getPos();
+      writer =
+          SequenceFile.createWriter(job, out, job.getMapOutputKeyClass(),
+                  job.getMapOutputValueClass(), compressionType, codec);
+    }
+    private void endPartition(int partNumber) throws IOException {
+      //Need to write syncs especially if block compression is in use
+      //We also update the index file to contain the part offsets per 
+      //spilled file
+      writer.sync();
+      indexOut.writeLong(segmentStart);
+      //we also store 0 length key/val segments to make the merge phase easier.
+      indexOut.writeLong(out.getPos()-segmentStart);
+    }
+    
+    public void collect(WritableComparable key,
+              Writable value) throws IOException {
+      synchronized (this) {
+        //dump the key/value to buffer
+        int keyOffset = keyValBuffer.getLength(); 
+        key.write(keyValBuffer);
+        int keyLength = keyValBuffer.getLength() - keyOffset;
+        value.write(keyValBuffer);
+        int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
+      
+        int partNumber = partitioner.getPartition(key, value, partitions);
+        sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
+
+        reportProgress(umbilical); 
+        myMetrics.mapOutput(keyValBuffer.getLength() - keyOffset);
+
+        //now check whether we need to spill to disk
+        long totalMem = 0;
+        for (int i = 0; i < partitions; i++)
+          totalMem += sortImpl[i].getMemoryUtilized();
+        if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
+          sortAndSpillToDisk();
+          keyValBuffer.reset();
+          for (int i = 0; i < partitions; i++)
+            sortImpl[i].close(); 
+        }
       }
-
-      final RecordReader rawIn =                  // open input
-        job.getInputFormat().getRecordReader
-        (FileSystem.get(job), split, job, reporter);
-
-      RecordReader in = new RecordReader() {      // wrap in progress reporter
-          private float perByte = 1.0f /(float)split.getLength();
-
-          public WritableComparable createKey() {
-            return rawIn.createKey();
-          }
+    }
+    
+    //sort, combine and spill to disk
+    private void sortAndSpillToDisk() throws IOException {
+      synchronized (this) {
+        Path filename = mapOutputFile.getSpillFile(getTaskId(), numSpills);
+        //we just create the FSDataOutputStream object here.
+        out = localFs.create(filename);
+        Path indexFilename = mapOutputFile.getSpillIndexFile(getTaskId(), 
+                                                             numSpills);
+        indexOut = localFs.create(indexFilename);
+        LOG.info("opened "+
+        mapOutputFile.getSpillFile(getTaskId(), numSpills).getName());
           
-          public Writable createValue() {
-            return rawIn.createValue();
-          }
+        //invoke the sort
+        for (int i = 0; i < partitions; i++) {
+          sortImpl[i].setInputBuffer(keyValBuffer);
+          RawKeyValueIterator rIter = sortImpl[i].sort();
           
-          public synchronized boolean next(Writable key, Writable value)
-            throws IOException {
-
-            float progress =                        // compute progress
-              (float)Math.min((rawIn.getPos()-split.getStart())*perByte, 1.0f);
-            reportProgress(umbilical, progress);
-
-            long beforePos = getPos();
-            boolean ret = rawIn.next(key, value);
-            myMetrics.mapInput(getPos() - beforePos);
-            return ret;
+          startPartition(i);
+          if (rIter != null) {
+            //invoke the combiner if one is defined
+            if (job.getCombinerClass() != null) {
+              //we instantiate and close the combiner for each partition. This
+              //is required for streaming where the combiner runs as a separate
+              //process and we want to make sure that the combiner process has
+              //got all the input key/val, processed, and output the result 
+              //key/vals before we write the partition header in the output file
+              Reducer combiner = (Reducer)ReflectionUtils.newInstance(
+                                         job.getCombinerClass(), job);
+              // make collector
+              OutputCollector combineCollector = new OutputCollector() {
+                public void collect(WritableComparable key, Writable value)
+                  throws IOException {
+                  synchronized (this) {
+                    writer.append(key, value);
+                  }
+                }
+              };
+              combineAndSpill(rIter, combiner, combineCollector);
+              combiner.close();
+            }
+            else //just spill the sorted data
+              spill(rIter);
           }
-          public long getPos() throws IOException { return rawIn.getPos(); }
-          public void close() throws IOException { rawIn.close(); }
-        };
-
-      MapRunnable runner =
-        (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
+          endPartition(i);
+        }
+        numSpills++;
+        out.close();
+        indexOut.close();
+      }
+    }
+    
+    private void combineAndSpill(RawKeyValueIterator resultIter, 
+    Reducer combiner, OutputCollector combineCollector) throws IOException {
+      //combine the key/value obtained from the offset & indices arrays.
+      CombineValuesIterator values = new CombineValuesIterator(resultIter,
+              comparator, keyClass, valClass, umbilical, job);
+      while (values.more()) {
+        combiner.reduce(values.getKey(), values, combineCollector, reporter);
+        values.nextKey();
+      }
+    }
+    
+    private void spill(RawKeyValueIterator resultIter) throws IOException {
+      Writable key = null;
+      Writable value = null;
 
       try {
-        runner.run(in, collector, reporter);      // run the map
+        key = (WritableComparable)ReflectionUtils.newInstance(keyClass, job);
+        value = (Writable)ReflectionUtils.newInstance(valClass, job);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
 
-        if (combining) {                          // flush combiner
-          ((CombiningCollector)collector).flush();
-        }
+      DataInputBuffer keyIn = new DataInputBuffer();
+      DataInputBuffer valIn = new DataInputBuffer();
+      DataOutputBuffer valOut = new DataOutputBuffer();
+      while (resultIter.next()) {
+        keyIn.reset(resultIter.getKey().getData(), 
+                    resultIter.getKey().getLength());
+        key.readFields(keyIn);
+        valOut.reset();
+        (resultIter.getValue()).writeUncompressedBytes(valOut);
+        valIn.reset(valOut.getData(), valOut.getLength());
+        value.readFields(valIn);
 
-      } finally {
-        if (combining) { 
-          ((CombiningCollector)collector).close(); 
-        } 
-        in.close();                               // close input
+        writer.append(key, value);
       }
-    } finally {
-      for (int i = 0; i < partitions; i++) {      // close output
-        if (outs[i] != null) {
-          outs[i].close();
+    }
+    
+    public void mergeParts() throws IOException {
+      Path finalOutputFile = mapOutputFile.getOutputFile(getTaskId());
+      Path finalIndexFile = mapOutputFile.getOutputIndexFile(getTaskId());
+      
+      if (numSpills == 1) { //the spill is the final output
+        Path spillPath = mapOutputFile.getSpillFile(getTaskId(), 0);
+        Path spillIndexPath = mapOutputFile.getSpillIndexFile(getTaskId(), 0);
+        localFs.rename(spillPath, finalOutputFile);
+        localFs.rename(spillIndexPath, finalIndexFile);
+        return;
+      }
+      
+      //The output stream for the final single output file
+      FSDataOutputStream finalOut = localFs.create(finalOutputFile, true, 
+                                                   4096);
+      //The final index file output stream
+      FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
+                                                           4096);
+      long segmentStart;
+      
+      if (numSpills == 0) {
+        //create dummy files
+        for (int i = 0; i < partitions; i++) {
+          segmentStart = finalOut.getPos();
+          SequenceFile.createWriter(job, finalOut, 
+                  job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
+                  compressionType, codec);
+          finalIndexOut.writeLong(segmentStart);
+          finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
         }
+        finalOut.close();
+        finalIndexOut.close();
+        return;
+      }
+      
+      Path [] filename = new Path[numSpills];
+      Path [] indexFileName = new Path[numSpills];
+      FSDataInputStream in[] = new FSDataInputStream[numSpills];
+      FSDataInputStream indexIn[] = new FSDataInputStream[numSpills];
+      
+      for(int i = 0; i < numSpills; i++) {
+        filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
+        in[i] = localFs.open(filename[i]);
+        indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
+        indexIn[i] = localFs.open(indexFileName[i]);
+      }
+      
+      //create a sorter object as we need access to the SegmentDescriptor
+      //class and merge methods
+      Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
+      sorter.setFactor(numSpills);
+      
+      for (int parts = 0; parts < partitions; parts++){
+        List<SegmentDescriptor> segmentList = new ArrayList(numSpills);
+        for(int i = 0; i < numSpills; i++) {
+          long segmentOffset = indexIn[i].readLong();
+          long segmentLength = indexIn[i].readLong();
+          SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
+                  segmentLength, filename[i]);
+          s.preserveInput(true);
+          s.doSync();
+          segmentList.add(i, s);
+        }
+        segmentStart = finalOut.getPos();
+        SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, 
+                job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
+                compressionType, codec);
+        sorter.writeFile(sorter.merge(segmentList), writer);
+        //add a sync block - required esp. for block compression to ensure
+        //partition data don't span partition boundaries
+        writer.sync();
+        //when we write the offset/length to the final index file, we write
+        //longs for both. This helps us to reliably seek directly to the
+        //offset/length for a partition when we start serving the byte-ranges
+        //to the reduces. We probably waste some space in the file by doing
+        //this as opposed to writing VLong but it helps us later on.
+        finalIndexOut.writeLong(segmentStart);
+        finalIndexOut.writeLong(finalOut.getPos()-segmentStart);
+      }
+      finalOut.close();
+      finalIndexOut.close();
+      //cleanup
+      for(int i = 0; i < numSpills; i++) {
+        in[i].close(); localFs.delete(filename[i]);
+        indexIn[i].close(); localFs.delete(indexFileName[i]);
       }
     }
-    done(umbilical);
-  }
-
-  public void setConf(Configuration conf) {
-    if (conf instanceof JobConf) {
-      this.conf = (JobConf) conf;
-    } else {
-      this.conf = new JobConf(conf);
+    
+    public void close() throws IOException {
+      //empty for now
+    }
+    
+    private class CombineValuesIterator extends ValuesIterator {
+        
+      public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, 
+              WritableComparator comparator, Class keyClass,
+              Class valClass, TaskUmbilicalProtocol umbilical, 
+              Configuration conf) 
+      throws IOException {
+        super(in, comparator, keyClass, valClass, umbilical, conf);
+      }
     }
-    this.mapOutputFile.setConf(this.conf);
-  }
-
-  public Configuration getConf() {
-    return this.conf;
   }
-  
 }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java?view=auto&rev=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java Thu Dec  7 17:53:07 2006
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Comparator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.util.MergeSort;
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
+
+/** This class implements the sort method from BasicTypeSorterBase class as
+ * MergeSort. Note that this class is really a wrapper over the actual
+ * mergesort implementation that is there in the util package. The main intent
+ * of providing this class is to setup the input data for the util.MergeSort
+ * algo so that the latter doesn't need to bother about the various data 
+ * structures that have been created for the Map output but rather concentrate 
+ * on the core algorithm (thereby allowing easy integration of a mergesort
+ * implementation). The bridge between this class and the util.MergeSort class
+ * is the Comparator.
+ * @author ddas
+ *
+ */
+class MergeSorter extends BasicTypeSorterBase 
+implements Comparator<IntWritable> {
+  
+  /** The sort method derived from BasicTypeSorterBase and overridden here*/
+  public RawKeyValueIterator sort() {
+    MergeSort m = new MergeSort(this);
+    int count = super.count;
+    if (count == 0) return null;
+    int [] pointers = (int[])super.pointers;
+    int [] pointersCopy = new int[count];
+    System.arraycopy(pointers, 0, pointersCopy, 0, count);
+    m.mergeSort(pointers, pointersCopy, 0, count);
+    return new MRSortResultIterator(super.keyValBuffer, pointersCopy, 
+           super.startOffsets, super.keyLengths, super.valueLengths);
+  }
+  /** The implementation of the compare method from Comparator. This basically
+   * forwards the call to the super class's compare. Note that
+   * Comparator.compare takes objects as inputs and so the int values are
+   * wrapped in (reusable) IntWritables from the class util.MergeSort
+   * @param i
+   * @param j
+   * @return int as per the specification of Comparator.compare
+   */
+  public int compare (IntWritable i, IntWritable j) {
+    return super.compare(i.get(), j.get());
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Dec  7 17:53:07 2006
@@ -121,7 +121,7 @@
   }
 
   /** Iterates values while keys match in sorted input. */
-  private class ValuesIterator implements Iterator {
+  static class ValuesIterator implements Iterator {
     private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
     private WritableComparable key;               // current key
     private Writable value;                       // current value
@@ -181,9 +181,6 @@
     public WritableComparable getKey() { return key; }
 
     private void getNext() throws IOException {
-      reducePhase.set(in.getProgress().get()); // update progress
-      reportProgress(umbilical);
-
       Writable lastKey = key;                     // save previous key
       try {
         key = (WritableComparable)ReflectionUtils.newInstance(keyClass, this.conf);
@@ -211,13 +208,25 @@
       }
     }
   }
+  private class ReduceValuesIterator extends ValuesIterator {
+    public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
+                               WritableComparator comparator, Class keyClass,
+                               Class valClass, TaskUmbilicalProtocol umbilical,
+                               Configuration conf)
+    throws IOException {
+      super(in, comparator, keyClass, valClass, umbilical, conf);
+    }
+    public void informReduceProgress() {
+      reducePhase.set(super.in.getProgress().get()); // update progress
+      reportProgress(super.umbilical);
+    }
+  }
 
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
     Class valueClass = job.getMapOutputValueClass();
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
                                   job.getReducerClass(), job);
-    reducer.configure(job);
     FileSystem lfs = FileSystem.getNamed("local", job);
 
     copyPhase.complete();                         // copy is already complete
@@ -262,7 +271,7 @@
       // sort the input file
       SequenceFile.Sorter sorter =
         new SequenceFile.Sorter(lfs, comparator, valueClass, job);
-      rIter = sorter.sortAndIterate(mapFiles, tempDir, 
+      rIter = sorter.merge(mapFiles, tempDir, 
                                     !conf.getKeepFailedTaskFiles()); // sort
 
     } finally {
@@ -300,12 +309,14 @@
     try {
       Class keyClass = job.getMapOutputKeyClass();
       Class valClass = job.getMapOutputValueClass();
-      ValuesIterator values = new ValuesIterator(rIter, comparator, keyClass, 
-                                                 valClass, umbilical, job);
+      ReduceValuesIterator values = new ReduceValuesIterator(rIter, comparator, 
+                                  keyClass, valClass, umbilical, job);
+      values.informReduceProgress();
       while (values.more()) {
         myMetrics.reduceInput();
         reducer.reduce(values.getKey(), values, collector, reporter);
         values.nextKey();
+        values.informReduceProgress();
       }
 
     } finally {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Dec  7 17:53:07 2006
@@ -21,6 +21,7 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.io.*;
 import org.apache.hadoop.metrics.Metrics;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -1525,6 +1526,7 @@
      * @author Owen O'Malley
      */
     public static class MapOutputServlet extends HttpServlet {
+      private final int MAX_BYTES_TO_READ = 64 * 1024;
       public void doGet(HttpServletRequest request, 
                         HttpServletResponse response
                        ) throws ServletException, IOException {
@@ -1535,20 +1537,37 @@
         }
         ServletContext context = getServletContext();
         int reduce = Integer.parseInt(reduceId);
-        byte[] buffer = new byte[64*1024];
+        byte[] buffer = new byte[MAX_BYTES_TO_READ];
         OutputStream outStream = response.getOutputStream();
         JobConf conf = (JobConf) context.getAttribute("conf");
         FileSystem fileSys = 
           (FileSystem) context.getAttribute("local.file.system");
-        Path filename = conf.getLocalPath(mapId+"/part-"+reduce+".out");
-        response.setContentLength((int) fileSys.getLength(filename));
-        InputStream inStream = null;
+        //open index file
+        Path indexFileName = conf.getLocalPath(mapId+"/file.out.index");
+        FSDataInputStream in = fileSys.open(indexFileName);
+        //seek to the correct offset for the given reduce
+        in.seek(reduce * 16);
+        
+        //read the offset and length of the partition data
+        long startOffset = in.readLong();
+        long partLength = in.readLong();
+        
+        in.close();
+         
+        Path mapOutputFileName = conf.getLocalPath(mapId+"/file.out"); 
+           
+        response.setContentLength((int) partLength);
+        FSDataInputStream inStream = null;
         // true iff IOException was caused by attempt to access input
         boolean isInputException = true;
         try {
-          inStream = fileSys.open(filename);
+          inStream = fileSys.open(mapOutputFileName);
+          inStream.seek(startOffset);
           try {
-            int len = inStream.read(buffer);
+            int totalRead = 0;
+            int len = inStream.read(buffer, 0,
+                                 partLength < MAX_BYTES_TO_READ 
+                                 ? (int)partLength : MAX_BYTES_TO_READ);
             while (len > 0) {
               try {
                 outStream.write(buffer, 0, len);
@@ -1556,7 +1575,11 @@
                 isInputException = false;
                 throw ie;
               }
-              len = inStream.read(buffer);
+              totalRead += len;
+              if (totalRead == partLength) break;
+              len = inStream.read(buffer, 0, 
+                      (partLength - totalRead) < MAX_BYTES_TO_READ
+                       ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
             }
           } finally {
             inStream.close();

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/MergeSort.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/MergeSort.java?view=auto&rev=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/MergeSort.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/MergeSort.java Thu Dec  7 17:53:07 2006
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.util.Comparator;
+import org.apache.hadoop.io.IntWritable;
+
+/** An implementation of the core algorithm of MergeSort. */
+public class MergeSort {
+  //Reusable IntWritables
+  IntWritable I = new IntWritable(0);
+  IntWritable J = new IntWritable(0);
+  
+  private Comparator comparator; //the comparator that the algo should use
+  
+  public MergeSort(Comparator comparator) {
+    this.comparator = comparator;
+  }
+  
+  public void mergeSort(int src[], int dest[], int low, int high) {
+    int length = high - low;
+
+    // Insertion sort on smallest arrays
+    if (length < 7) {
+      for (int i=low; i<high; i++) {
+        for (int j=i;j > low ; j--) {
+          I.set(dest[j-1]);
+          J.set(dest[j]);
+          if (comparator.compare(I, J)>0)
+            swap(dest, j, j-1);
+        }
+      }
+      return;
+    }
+
+    // Recursively sort halves of dest into src
+    int mid = (low + high) >> 1;
+    mergeSort(dest, src, low, mid);
+    mergeSort(dest, src, mid, high);
+
+    I.set(src[mid-1]);
+    J.set(src[mid]);
+    // If list is already sorted, just copy from src to dest.  This is an
+    // optimization that results in faster sorts for nearly ordered lists.
+    if (comparator.compare(I, J) <= 0) {
+      System.arraycopy(src, low, dest, low, length);
+      return;
+    }
+
+    // Merge sorted halves (now in src) into dest
+    for (int i = low, p = low, q = mid; i < high; i++) {
+      if (q < high && p < mid) {
+        I.set(src[p]);
+        J.set(src[q]);
+      }
+      if (q>=high || p<mid && comparator.compare(I, J) <= 0)
+        dest[i] = src[p++];
+      else
+        dest[i] = src[q++];
+    }
+  }
+
+  private void swap(int x[], int a, int b) {
+    int t = x[a];
+    x[a] = x[b];
+    x[b] = t;
+  }
+}

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Thu Dec  7 17:53:07 2006
@@ -233,16 +233,6 @@
       }
 
       public void close() throws IOException {
-        MapOutputFile namer = new MapOutputFile();
-        namer.setConf(conf);
-        FileSystem fs = FileSystem.get(conf);
-        Path output = namer.getOutputFile(taskId, 0);
-        assertTrue("map output exists " + output, fs.exists(output));
-        SequenceFile.Reader rdr = 
-          new SequenceFile.Reader(fs, output, conf);
-        assertEquals("is map output compressed " + output, compress, 
-                     rdr.isCompressed());
-        rdr.close();
       }
     }
     
@@ -264,7 +254,7 @@
                         ) throws IOException {
         if (first) {
           first = false;
-          Path input = conf.getLocalPath(taskId+"/all.2");
+          Path input = conf.getLocalPath(taskId+"/map_0.out");
           FileSystem fs = FileSystem.get(conf);
           assertTrue("reduce input exists " + input, fs.exists(input));
           SequenceFile.Reader rdr =