You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/06/05 06:06:14 UTC

svn commit: r663440 [2/3] - in /hadoop/core/trunk: ./ conf/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/compress/ src/java/org/apache/hadoop/io/compress/zlib/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/ src/test...

Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Merger.java?rev=663440&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Merger.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Merger.java Wed Jun  4 21:06:13 2008
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.util.PriorityQueue;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+
+class Merger {  
+  private static final Log LOG = LogFactory.getLog(Merger.class);
+  
+  private static final long PROGRESS_BAR = 10000;
+  
+  public static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass, 
+                            CompressionCodec codec,
+                            Path[] inputs, boolean deleteInputs, 
+                            int mergeFactor, Path tmpDir,
+                            RawComparator<K> comparator, Progressable reporter)
+  throws IOException {
+    return 
+      new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
+                           reporter).merge(keyClass, valueClass,
+                                           mergeFactor, tmpDir);
+  }
+  
+  public static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs, 
+                            Class<K> keyClass, Class<V> valueClass, 
+                            List<Segment<K, V>> segments, 
+                            int mergeFactor, Path tmpDir,
+                            RawComparator<K> comparator, Progressable reporter)
+  throws IOException {
+    return 
+      new MergeQueue<K, V>(conf, fs, segments, 
+                           comparator, reporter).merge(keyClass, valueClass,
+                                                       mergeFactor, tmpDir);
+  }
+
+  public static <K extends Object, V extends Object>
+  void writeFile(RawKeyValueIterator records, Writer<K, V> writer, 
+                 Progressable progressable) 
+  throws IOException {
+    long recordCtr = 0;
+    while(records.next()) {
+      writer.append(records.getKey(), records.getValue());
+      
+      if ((++recordCtr % PROGRESS_BAR) == 0) {
+        progressable.progress();
+      }
+    }
+}
+
+  public static class Segment<K extends Object, V extends Object> {
+    Reader<K, V> reader = null;
+    DataInputBuffer key = new DataInputBuffer();
+    DataInputBuffer value = new DataInputBuffer();
+    
+    Configuration conf = null;
+    FileSystem fs = null;
+    Path file = null;
+    boolean preserve = false;
+    CompressionCodec codec = null;
+    long segmentLength = -1;
+    
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   CompressionCodec codec, boolean preserve) throws IOException {
+      this.conf = conf;
+      this.fs = fs;
+      this.file = file;
+      this.codec = codec;
+      this.preserve = preserve;
+      
+      this.segmentLength = fs.getFileStatus(file).getLen();
+    }
+    
+    public Segment(Reader<K, V> reader, boolean preserve) {
+      this.reader = reader;
+      this.preserve = preserve;
+      
+      this.segmentLength = reader.getLength();
+    }
+
+    private void init() throws IOException {
+      if (reader == null) {
+        reader = new Reader<K, V>(conf, fs, file, codec);
+      }
+    }
+    
+    DataInputBuffer getKey() { return key; }
+    DataInputBuffer getValue() { return value; }
+
+    long getLength() { return segmentLength; }
+    
+    boolean next() throws IOException {
+      return reader.next(key, value);
+    }
+    
+    void close() throws IOException {
+      reader.close();
+      
+      if (!preserve && fs != null) {
+        fs.delete(file, false);
+      }
+    }
+  }
+  
+  private static class MergeQueue<K extends Object, V extends Object> 
+  extends PriorityQueue implements RawKeyValueIterator {
+    Configuration conf;
+    FileSystem fs;
+    CompressionCodec codec;
+    
+    List<Segment<K, V>> segments = new ArrayList<Segment<K,V>>();
+    
+    RawComparator<K> comparator;
+    
+    private long totalBytesProcessed;
+    private float progPerByte;
+    private Progress mergeProgress = new Progress();
+    
+    Progressable reporter;
+    
+    DataInputBuffer key;
+    DataInputBuffer value;
+    
+    Segment<K, V> minSegment;
+    Comparator<Segment<K, V>> segmentComparator =   
+      new Comparator<Segment<K, V>>() {
+      public int compare(Segment<K, V> o1, Segment<K, V> o2) {
+        if (o1.getLength() == o2.getLength()) {
+          return 0;
+        }
+
+        return o1.getLength() < o2.getLength() ? -1 : 1;
+      }
+    };
+
+    
+    public MergeQueue(Configuration conf, FileSystem fs, 
+                      Path[] inputs, boolean deleteInputs, 
+                      CompressionCodec codec, RawComparator<K> comparator,
+                      Progressable reporter) 
+    throws IOException {
+      this.conf = conf;
+      this.fs = fs;
+      this.codec = codec;
+      this.comparator = comparator;
+      this.reporter = reporter;
+      
+      for (Path file : inputs) {
+        segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs));
+      }
+      
+      // Sort segments on file-lengths
+      Collections.sort(segments, segmentComparator); 
+    }
+    
+
+    public MergeQueue(Configuration conf, FileSystem fs, 
+        List<Segment<K, V>> segments, RawComparator<K> comparator,
+        Progressable reporter) {
+      this.conf = conf;
+      this.fs = fs;
+      this.comparator = comparator;
+      this.segments = segments;
+      this.reporter = reporter;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void close() throws IOException {
+      Segment<K, V> segment;
+      while((segment = (Segment<K, V>)pop()) != null) {
+        segment.close();
+      }
+    }
+
+    public DataInputBuffer getKey() throws IOException {
+      return key;
+    }
+
+    public DataInputBuffer getValue() throws IOException {
+      return value;
+    }
+
+    private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
+      if (reader.next()) {
+        adjustTop();
+      } else {
+        pop();
+        reader.close();
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    public boolean next() throws IOException {
+      if (size() == 0)
+        return false;
+
+      if (minSegment != null) {
+        //minSegment is non-null for all invocations of next except the first
+        //one. For the first invocation, the priority queue is ready for use
+        //but for the subsequent invocations, first adjust the queue 
+        adjustPriorityQueue(minSegment);
+        if (size() == 0) {
+          minSegment = null;
+          return false;
+        }
+      }
+      minSegment = (Segment<K, V>)top();
+      
+      key = minSegment.getKey();
+      value = minSegment.getValue();
+
+      totalBytesProcessed += (key.getLength()-key.getPosition()) + 
+                             (value.getLength()-value.getPosition());
+      mergeProgress.set(totalBytesProcessed * progPerByte);
+
+      return true;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected boolean lessThan(Object a, Object b) {
+      DataInputBuffer key1 = ((Segment<K, V>)a).getKey();
+      DataInputBuffer key2 = ((Segment<K, V>)b).getKey();
+      int s1 = key1.getPosition();
+      int l1 = key1.getLength() - s1;
+      int s2 = key2.getPosition();
+      int l2 = key2.getLength() - s2;
+
+      return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
+    }
+    
+    public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
+                                     int factor, Path tmpDir) 
+    throws IOException {
+      LOG.info("Merging " + segments.size() + " sorted segments");
+      
+      //create the MergeStreams from the sorted map created in the constructor
+      //and dump the final output to a file
+      int numSegments = segments.size();
+      int origFactor = factor;
+      int passNo = 1;
+      LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+      do {
+        //get the factor for this pass of merge
+        factor = getPassFactor(factor, passNo, numSegments);
+        List<Segment<K, V>> segmentsToMerge =
+          new ArrayList<Segment<K, V>>();
+        int segmentsConsidered = 0;
+        int numSegmentsToConsider = factor;
+        while (true) {
+          //extract the smallest 'factor' number of segments  
+          //Call cleanup on the empty segments (no key/value data)
+          List<Segment<K, V>> mStream = 
+            getSegmentDescriptors(numSegmentsToConsider);
+          for (Segment<K, V> segment : mStream) {
+            // Initialize the segment at the last possible moment;
+            // this helps in ensuring we don't use buffers until we need them
+            segment.init();
+            
+            if (segment.next()) {
+              segmentsToMerge.add(segment);
+              segmentsConsidered++;
+            }
+            else {
+              segment.close();
+              numSegments--; //we ignore this segment for the merge
+            }
+          }
+          //if we have the desired number of segments
+          //or looked at all available segments, we break
+          if (segmentsConsidered == factor || 
+              segments.size() == 0) {
+            break;
+          }
+            
+          numSegmentsToConsider = factor - segmentsConsidered;
+        }
+        
+        //feed the streams to the priority queue
+        initialize(segmentsToMerge.size()); clear();
+        for (Segment<K, V> segment : segmentsToMerge) {
+          put(segment);
+        }
+        
+        //if we have lesser number of segments remaining, then just return the
+        //iterator, else do another single level merge
+        if (numSegments <= factor) {
+          //calculate the length of the remaining segments. Required for 
+          //calculating the merge progress
+          long totalBytes = 0;
+          for (int i = 0; i < segmentsToMerge.size(); i++) {
+            totalBytes += segmentsToMerge.get(i).getLength();
+          }
+          if (totalBytes != 0) //being paranoid
+            progPerByte = 1.0f / (float)totalBytes;
+
+          // Reset bytes-processed to track the progress of the final merge
+          totalBytesProcessed = 0;
+          
+          LOG.info("Down to the last merge-pass, with " + numSegments + 
+                   " segments left of total size: " + totalBytes + " bytes");
+          return this;
+        } else {
+          LOG.info("Merging " + segmentsToMerge.size() + 
+                   " intermediate segments out of a total of " + 
+                   (segments.size()+segmentsToMerge.size()));
+          
+          //we want to spread the creation of temp files on multiple disks if 
+          //available under the space constraints
+          long approxOutputSize = 0; 
+          for (Segment<K, V> s : segmentsToMerge) {
+            approxOutputSize += s.getLength() + 
+                                ChecksumFileSystem.getApproxChkSumLength(
+                                s.getLength());
+          }
+          Path tmpFilename = 
+            new Path(tmpDir, "intermediate").suffix("." + passNo);
+
+          Path outputFile =  lDirAlloc.getLocalPathForWrite(
+                                              tmpFilename.toString(),
+                                              approxOutputSize, conf);
+
+          Writer<K, V> writer = 
+            new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec);
+          writeFile(this, writer, reporter);
+          writer.close();
+          
+          //we finished one single level merge; now clean up the priority 
+          //queue
+          this.close();
+
+          // Add the newly create segment to the list of segments to be merged
+          Segment<K, V> tempSegment = 
+            new Segment<K, V>(conf, fs, outputFile, codec, false);
+          segments.add(tempSegment);
+          numSegments = segments.size();
+          Collections.sort(segments, segmentComparator);
+          
+          passNo++;
+        }
+        //we are worried about only the first pass merge factor. So reset the 
+        //factor to what it originally was
+        factor = origFactor;
+      } while(true);
+    }
+    
+    //HADOOP-591
+    private int getPassFactor(int factor, int passNo, int numSegments) {
+      if (passNo > 1 || numSegments <= factor || factor == 1) 
+        return factor;
+      int mod = (numSegments - 1) % (factor - 1);
+      if (mod == 0)
+        return factor;
+      return mod + 1;
+    }
+    
+    /** Return (& remove) the requested number of segment descriptors from the
+     * sorted map.
+     */
+    private List<Segment<K, V>> getSegmentDescriptors(int numDescriptors) {
+      if (numDescriptors > segments.size()) {
+        List<Segment<K, V>> subList = new ArrayList<Segment<K,V>>(segments);
+        segments.clear();
+        return subList;
+      }
+      
+      List<Segment<K, V>> subList = 
+        new ArrayList<Segment<K,V>>(segments.subList(0, numDescriptors));
+      for (int i=0; i < numDescriptors; ++i) {
+        segments.remove(0);
+      }
+      return subList;
+    }
+
+    public Progress getProgress() {
+      return mergeProgress;
+    }
+
+  }
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java?rev=663440&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java Wed Jun  4 21:06:13 2008
@@ -0,0 +1,48 @@
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+
+class RamManager {
+  volatile private int numReserved = 0;
+  volatile private int size = 0;
+  private final int maxSize;
+  
+  public RamManager(Configuration conf) {
+    maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024;
+  }
+  
+  synchronized boolean reserve(long requestedSize) {
+    if (requestedSize > Integer.MAX_VALUE || 
+        (size + requestedSize) > Integer.MAX_VALUE) {
+      return false;
+    }
+    
+    if ((size + requestedSize) < maxSize) {
+      size += requestedSize;
+      ++numReserved;
+      return true;
+    }
+    return false;
+  }
+  
+  synchronized void unreserve(int requestedSize) {
+    size -= requestedSize;
+    --numReserved;
+  }
+  
+  int getUsedMemory() {
+    return size;
+  }
+  
+  float getPercentUsed() {
+    return (float)size/maxSize;
+  }
+  
+  int getReservedFiles() {
+    return numReserved;
+  }
+  
+  int getMemoryLimit() {
+    return maxSize;
+  }
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RawKeyValueIterator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RawKeyValueIterator.java?rev=663440&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RawKeyValueIterator.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RawKeyValueIterator.java Wed Jun  4 21:06:13 2008
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+
+/**
+ * <code>RawKeyValueIterator</code> is an iterator used to iterate over
+ * the raw keys and values during sort/merge of intermediate data. 
+ */
+interface RawKeyValueIterator {
+  /** 
+   * Gets the current raw key.
+   * 
+   * @return Gets the current raw key as a DataInputBuffer
+   * @throws IOException
+   */
+  DataInputBuffer getKey() throws IOException;
+  
+  /** 
+   * Gets the current raw value.
+   * 
+   * @return Gets the current raw value as a DataInputBuffer 
+   * @throws IOException
+   */
+  DataInputBuffer getValue() throws IOException;
+  
+  /** 
+   * Sets up the current key and value (for getKey and getValue).
+   * 
+   * @return <code>true</code> if there exists a key/value, 
+   *         <code>false</code> otherwise. 
+   * @throws IOException
+   */
+  boolean next() throws IOException;
+  
+  /** 
+   * Closes the iterator so that the underlying streams can be closed.
+   * 
+   * @throws IOException
+   */
+  void close() throws IOException;
+  
+  /** Gets the Progress object; this has a float (0.0 - 1.0) 
+   * indicating the bytes processed by the iterator so far
+   */
+  Progress getProgress();
+}

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Jun  4 21:06:13 2008
@@ -22,21 +22,23 @@
 import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.URI;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.net.URLConnection;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Hashtable;
 import java.util.LinkedHashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
@@ -48,20 +50,20 @@
 import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.InMemoryFileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.InputBuffer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.IFile.*;
+import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -86,6 +88,9 @@
   private int numMaps;
   private ReduceCopier reduceCopier;
 
+  private CompressionCodec codec;
+
+
   { 
     getProgress().setStatus("reduce"); 
     setPhase(TaskStatus.Phase.SHUFFLE);        // phase to start with 
@@ -137,6 +142,17 @@
     super(jobFile, taskId, partition);
     this.numMaps = numMaps;
   }
+  
+  private CompressionCodec initCodec() {
+    // check if map-outputs are to be compressed
+    if (conf.getCompressMapOutput()) {
+      Class<? extends CompressionCodec> codecClass =
+        conf.getMapOutputCompressorClass(DefaultCodec.class);
+      return (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
+    } 
+
+    return null;
+  }
 
   @Override
   public TaskRunner createRunner(TaskTracker tracker) throws IOException {
@@ -191,118 +207,9 @@
     return fileList.toArray(new Path[0]);
   }
 
-  /** Iterates values while keys match in sorted input. */
-  static class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
-    private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
-    private KEY key;               // current key
-    private KEY nextKey;
-    private VALUE value;             // current value
-    private boolean hasNext;                      // more w/ this key
-    private boolean more;                         // more in file
-    private RawComparator<KEY> comparator;
-    private DataOutputBuffer nextValue = new DataOutputBuffer();
-    private InputBuffer valIn = new InputBuffer();
-    private InputBuffer keyIn = new InputBuffer();
-    protected Progressable reporter;
-    private Deserializer<KEY> keyDeserializer;
-    private Deserializer<VALUE> valDeserializer;
-
-    @SuppressWarnings("unchecked")
-    public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in, 
-                           RawComparator<KEY> comparator, 
-                           Class<KEY> keyClass,
-                           Class<VALUE> valClass, Configuration conf, 
-                           Progressable reporter)
-      throws IOException {
-      this.in = in;
-      this.comparator = comparator;
-      this.reporter = reporter;
-      SerializationFactory serializationFactory = new SerializationFactory(conf);
-      this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
-      this.keyDeserializer.open(keyIn);
-      this.valDeserializer = serializationFactory.getDeserializer(valClass);
-      this.valDeserializer.open(valIn);
-      readNextKey();
-      key = nextKey;
-      nextKey = null; // force new instance creation
-      hasNext = more;
-    }
-
-    /// Iterator methods
-
-    public boolean hasNext() { return hasNext; }
-
-    public VALUE next() {
-      if (!hasNext) {
-        throw new NoSuchElementException("iterate past last value");
-      }
-      try {
-        readNextValue();
-        readNextKey();
-      } catch (IOException ie) {
-        throw new RuntimeException("problem advancing", ie);
-      }
-      reporter.progress();
-      return value;
-    }
-
-    public void remove() { throw new RuntimeException("not implemented"); }
-
-    /// Auxiliary methods
-
-    /** Start processing next unique key. */
-    public void nextKey() throws IOException {
-      // read until we find a new key
-      while (hasNext) { 
-        readNextKey();
-      }
-      // move the next key to the current one
-      KEY tmpKey = key;
-      key = nextKey;
-      nextKey = tmpKey;
-      hasNext = more;
-    }
-
-    /** True iff more keys remain. */
-    public boolean more() { 
-      return more; 
-    }
-
-    /** The current key. */
-    public Object getKey() { 
-      return key; 
-    }
-
-    /** 
-     * read the next key 
-     */
-    private void readNextKey() throws IOException {
-      more = in.next();
-      if (more) {
-        DataOutputBuffer nextKeyBytes = in.getKey();
-        keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getLength());
-        nextKey = keyDeserializer.deserialize(nextKey);
-        hasNext = key != null && (comparator.compare(key, nextKey) == 0);
-      } else {
-        hasNext = false;
-      }
-    }
-
-    /**
-     * Read the next value
-     * @throws IOException
-     */
-    private void readNextValue() throws IOException {
-      nextValue.reset();
-      in.getValue().writeUncompressedBytes(nextValue);
-      valIn.reset(nextValue.getData(), nextValue.getLength());
-      value = valDeserializer.deserialize(value);
-    }
-  }
-
   private class ReduceValuesIterator<KEY,VALUE> 
           extends ValuesIterator<KEY,VALUE> {
-    public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
+    public ReduceValuesIterator (RawKeyValueIterator in,
                                  RawComparator<KEY> comparator, 
                                  Class<KEY> keyClass,
                                  Class<VALUE> valClass,
@@ -310,15 +217,17 @@
       throws IOException {
       super(in, comparator, keyClass, valClass, conf, reporter);
     }
-    public void informReduceProgress() {
-      reducePhase.set(super.in.getProgress().get()); // update progress
-      reporter.progress();
-    }
+
     @Override
     public VALUE next() {
       reduceInputValueCounter.increment(1);
       return super.next();
     }
+    
+    public void informReduceProgress() {
+      reducePhase.set(super.in.getProgress().get()); // update progress
+      reporter.progress();
+    }
   }
 
   @Override
@@ -332,6 +241,10 @@
     startCommunicationThread(umbilical);
 
     FileSystem lfs = FileSystem.getLocal(job);
+    
+    // Initialize the codec
+    codec = initCodec();
+
     boolean isLocal = true;
     if (!job.get("mapred.job.tracker", "local").equals("local")) {
       reduceCopier = new ReduceCopier(umbilical, job);
@@ -347,21 +260,21 @@
     Path[] mapFiles = getMapFiles(lfs, isLocal);
     
     Path tempDir = new Path(getTaskID().toString()); 
-
-    SequenceFile.Sorter.RawKeyValueIterator rIter;
  
     setPhase(TaskStatus.Phase.SORT); 
 
     final Reporter reporter = getReporter(umbilical);
     
     // sort the input file
-    SequenceFile.Sorter sorter = new SequenceFile.Sorter(lfs, 
-        job.getOutputKeyComparator(), job.getMapOutputKeyClass(),
-        job.getMapOutputValueClass(), job);
-    sorter.setProgressable(reporter);
-    rIter = sorter.merge(mapFiles, tempDir, 
-        !conf.getKeepFailedTaskFiles()); // sort
-
+    LOG.info("Initiating final on-disk merge with " + mapFiles.length + 
+             " files");
+    RawKeyValueIterator rIter = 
+      Merger.merge(job, lfs,
+                   job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
+                   codec, mapFiles, !conf.getKeepFailedTaskFiles(), 
+                   job.getInt("io.sort.factor", 100), tempDir, 
+                   job.getOutputKeyComparator(), reporter); 
+        
     // free up the data structures
     mapOutputFilesOnDisk.clear();
     mapFiles = null;
@@ -371,6 +284,7 @@
 
     // make output collector
     String finalName = getOutputName(getPartition());
+
     FileSystem fs = FileSystem.get(job);
 
     final RecordWriter out = 
@@ -421,7 +335,7 @@
     done(umbilical);
   }
 
-  class ReduceCopier implements MRConstants {
+  class ReduceCopier<K, V> implements MRConstants {
 
     /** Reference to the umbilical object */
     private TaskUmbilicalProtocol umbilical;
@@ -479,18 +393,15 @@
     /**
      * A reference to the in memory file system for writing the map outputs to.
      */
-    private InMemoryFileSystem inMemFileSys;
+    //private InMemoryFileSystem inMemFileSys;
+    
+    private RamManager ramManager;
     
     /**
      * A reference to the local file system for writing the map outputs to.
      */
     private FileSystem localFileSys;
-    
-    /**
-     * An instance of the sorter used for doing merge
-     */
-    private SequenceFile.Sorter sorter;
-    
+
     /**
      * Number of files to merge at a time
      */
@@ -519,7 +430,7 @@
     /**
      * The threads for fetching the files.
      */
-    private MapOutputCopier[] copiers = null;
+    private List<MapOutputCopier> copiers = null;
     
     /**
      * The object for metrics reporting.
@@ -540,8 +451,8 @@
     /** 
      * The set of required map outputs
      */
-    private Set <Integer> neededOutputs = 
-      Collections.synchronizedSet(new TreeSet<Integer>());
+    private Set <TaskID> copiedMapOutputs = 
+      Collections.synchronizedSet(new TreeSet<TaskID>());
     
     /** 
      * The set of obsolete map taskids.
@@ -602,7 +513,7 @@
      * The maps from which we fail to fetch map-outputs 
      * even after {@link #maxFetchRetriesPerMap} retries.
      */
-    Set<Integer> fetchFailedMaps = new TreeSet<Integer>(); 
+    Set<TaskID> fetchFailedMaps = new TreeSet<TaskID>(); 
     
     /**
      * A map of taskId -> no. of failed fetches
@@ -620,6 +531,13 @@
      */
     private static final int MIN_LOG_TIME = 60000;
 
+    /** 
+     * List of in-memory map-outputs.
+     */
+    private final List<MapOutput> mapOutputsFilesInMemory =
+      Collections.synchronizedList(new LinkedList<MapOutput>());
+    
+
     /**
      * This class contains the methods that should be used for metrics-reporting
      * the specific metrics for shuffle. This class actually reports the
@@ -696,7 +614,6 @@
         this.size = size;
       }
       
-      public int getMapId() { return loc.getMapId(); }
       public boolean getSuccess() { return size >= 0; }
       public boolean isObsolete() { 
         return size == OBSOLETE;
@@ -706,28 +623,114 @@
       public MapOutputLocation getLocation() { return loc; }
     }
     
-    private int extractMapIdFromPathName(Path pathname) {
-      //all paths end with map_<id>.out
-      String firstPathName = pathname.getName();
-      int beginIndex = firstPathName.lastIndexOf("map_");
-      int endIndex = firstPathName.lastIndexOf(".out");
-      return Integer.parseInt(firstPathName.substring(beginIndex +
-                              "map_".length(), endIndex));
+    private int nextMapOutputCopierId = 0;
+    
+    /**
+     * Abstraction to track a map-output.
+     */
+    private class MapOutputLocation {
+      TaskAttemptID taskAttemptId;
+      TaskID taskId;
+      String ttHost;
+      URL taskOutput;
+      
+      public MapOutputLocation(TaskAttemptID taskAttemptId, 
+                               String ttHost, URL taskOutput) {
+        this.taskAttemptId = taskAttemptId;
+        this.taskId = this.taskAttemptId.getTaskID();
+        this.ttHost = ttHost;
+        this.taskOutput = taskOutput;
+      }
+      
+      public TaskAttemptID getTaskAttemptId() {
+        return taskAttemptId;
+      }
+      
+      public TaskID getTaskId() {
+        return taskId;
+      }
+      
+      public String getHost() {
+        return ttHost;
+      }
+      
+      public URL getOutputLocation() {
+        return taskOutput;
+      }
     }
     
-    private int nextMapOutputCopierId = 0;
+    /** Describes the output of a map; could either be on disk or in-memory. */
+    private class MapOutput {
+      final TaskID mapId;
+      
+      final Path file;
+      final Configuration conf;
+      
+      byte[] data;
+      final boolean inMemory;
+      long size;
+      
+      public MapOutput(TaskID mapId, Configuration conf, Path file, long size) {
+        this.mapId = mapId;
+        
+        this.conf = conf;
+        this.file = file;
+        this.size = size;
+        
+        this.data = null;
+        
+        this.inMemory = false;
+      }
+      
+      public MapOutput(TaskID mapId, byte[] data) {
+        this.mapId = mapId;
+        
+        this.file = null;
+        this.conf = null;
+        
+        this.data = data;
+        this.size = data.length;
+        
+        this.inMemory = true;
+      }
+      
+      public void discard() throws IOException {
+        if (inMemory) {
+          data = null;
+        } else {
+          FileSystem fs = file.getFileSystem(conf);
+          fs.delete(file, true);
+        }
+      }
+    }
     
     /** Copies map outputs as they become available */
     private class MapOutputCopier extends Thread {
-      
+      // basic/unit connection timeout (in milliseconds)
+      private final static int UNIT_CONNECT_TIMEOUT = 30 * 1000;
+      // default read timeout (in milliseconds)
+      private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
+
       private MapOutputLocation currentLocation = null;
       private int id = nextMapOutputCopierId++;
       private Reporter reporter;
       
-      public MapOutputCopier(Reporter reporter) {
+      // Decompression of map-outputs
+      private CompressionCodec codec = null;
+      private Decompressor decompressor = null;
+      
+      public MapOutputCopier(JobConf job, Reporter reporter) {
         setName("MapOutputCopier " + reduceTask.getTaskID() + "." + id);
         LOG.debug(getName() + " created");
         this.reporter = reporter;
+        
+        if (job.getCompressMapOutput()) {
+          Class<? extends CompressionCodec> codecClass =
+            job.getMapOutputCompressorClass(DefaultCodec.class);
+          codec = (CompressionCodec)
+            ReflectionUtils.newInstance(codecClass, job);
+          decompressor = CodecPool.getDecompressor(codec);
+        }
       }
       
       /**
@@ -789,7 +792,7 @@
               shuffleClientMetrics.successFetch();
             } catch (IOException e) {
               LOG.warn(reduceTask.getTaskID() + " copy failed: " +
-                       loc.getMapTaskID() + " from " + loc.getHost());
+                       loc.getTaskAttemptId() + " from " + loc.getHost());
               LOG.warn(StringUtils.stringifyException(e));
               shuffleClientMetrics.failedFetch();
               
@@ -817,99 +820,269 @@
       private long copyOutput(MapOutputLocation loc
                               ) throws IOException, InterruptedException {
         // check if we still need to copy the output from this location
-        if (!neededOutputs.contains(loc.getMapId()) || 
-            obsoleteMapIds.contains(loc.getMapTaskID())) {
+        if (copiedMapOutputs.contains(loc.getTaskId()) || 
+            obsoleteMapIds.contains(loc.getTaskAttemptId())) {
           return CopyResult.OBSOLETE;
         } 
  
-        TaskAttemptID reduceId = reduceTask.getTaskID();
-        LOG.info(reduceId + " Copying " + loc.getMapTaskID() +
-                 " output from " + loc.getHost() + ".");
         // a temp filename. If this file gets created in ramfs, we're fine,
         // else, we will check the localFS to find a suitable final location
         // for this path
+        TaskAttemptID reduceId = reduceTask.getTaskID();
         Path filename = new Path("/" + TaskTracker.getJobCacheSubdir() +
                                  Path.SEPARATOR + getTaskID().getJobID() +
                                  Path.SEPARATOR + reduceId +
                                  Path.SEPARATOR + "output" + "/map_" +
-                                 loc.getMapId() + ".out");
-        // a working filename that will be unique to this attempt
-        Path tmpFilename = new Path(filename + "-" + id);
-        // this copies the map output file
-        tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleClientMetrics,
-                                  tmpFilename, lDirAlloc, 
-                                  conf, reduceTask.getPartition(), 
-                                  STALLED_COPY_TIMEOUT, reporter);
-        if (!neededOutputs.contains(loc.getMapId())) {
-          if (tmpFilename != null) {
-            FileSystem fs = tmpFilename.getFileSystem(conf);
-            fs.delete(tmpFilename, true);
-          }
-          return CopyResult.OBSOLETE;
+                                 loc.getTaskId().getId() + ".out");
+        
+        // Copy the map output to a temp file whose name is unique to this attempt 
+        Path tmpMapOutput = new Path(filename+"-"+id);
+        
+        // Copy the map output
+        MapOutput mapOutput = getMapOutput(loc, tmpMapOutput);
+        if (mapOutput == null) {
+          throw new IOException("Failed to fetch map-output for " + 
+                                loc.getTaskAttemptId() + " from " + 
+                                loc.getHost());
         }
-        if (tmpFilename == null)
-          throw new IOException("File " + filename + "-" + id + 
-                                " not created");
-        // This file could have been created in the inmemory
-        // fs or the localfs. So need to get the filesystem owning the path. 
-        FileSystem fs = tmpFilename.getFileSystem(conf);
-        long bytes = -1;
+        
+        // The size of the map-output
+        long bytes = mapOutput.size;
+        
         // lock the ReduceTask while we do the rename
         synchronized (ReduceTask.this) {
-          if (!neededOutputs.contains(loc.getMapId())) {
-            fs.delete(tmpFilename, true);
+          if (copiedMapOutputs.contains(loc.getTaskId())) {
+            mapOutput.discard();
             return CopyResult.OBSOLETE;
           }
+
+          // Special case: discard empty map-outputs
+          if (bytes == 0) {
+            try {
+              mapOutput.discard();
+            } catch (IOException ioe) {
+              LOG.info("Couldn't discard output of " + loc.getTaskId());
+            }
+            
+            // Note that we successfully copied the map-output
+            copiedMapOutputs.add(loc.getTaskId());
+            return bytes;
+          }
           
-          bytes = fs.getFileStatus(tmpFilename).getLen();
-          //resolve the final filename against the directory where the tmpFile
-          //got created
-          filename = new Path(tmpFilename.getParent(), filename.getName());
-          // if we can't rename the file, something is broken (and IOException
-          // will be thrown). 
-          if (!fs.rename(tmpFilename, filename)) {
-            fs.delete(tmpFilename, true);
-            bytes = -1;
-            throw new IOException("failure to rename map output " + 
-                                  tmpFilename);
+          // Process map-output
+          if (mapOutput.inMemory) {
+            // Save it in the synchronized list of map-outputs
+            mapOutputsFilesInMemory.add(mapOutput);
+              
+            //Create a thread to do merges. Synchronize access/update to 
+            //mergeInProgress
+            if (!mergeInProgress && 
+                ((ramManager.getPercentUsed() >= MAX_INMEM_FILESYS_USE  && 
+                  ramManager.getReservedFiles() >= 
+                    (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) || 
+                (mergeThreshold > 0 && 
+                 ramManager.getReservedFiles() >= mergeThreshold)) &&
+                mergeThrowable == null) {
+              LOG.info(reduceId + " RamManager " + 
+                       " is " + ramManager.getPercentUsed() + " full with " + 
+                       mapOutputsFilesInMemory.size() + " files." +
+                       " Triggering merge");
+
+              InMemFSMergeThread m = 
+                new InMemFSMergeThread((LocalFileSystem)localFileSys);
+              m.setName("Thread for merging in-memory files");
+              m.setDaemon(true);
+              mergeInProgress = true;
+              m.start();
+            }
+          } else {
+            // Rename the temporary file to the final file; 
+            // ensure it is on the same partition
+            tmpMapOutput = mapOutput.file;
+            filename = new Path(tmpMapOutput.getParent(), filename.getName());
+            if (!localFileSys.rename(tmpMapOutput, filename)) {
+              localFileSys.delete(tmpMapOutput, true);
+              bytes = -1;
+              throw new IOException("Failed to rename map output " + 
+                  tmpMapOutput + " to " + filename);
+            }
+
+            synchronized (mapOutputFilesOnDisk) {        
+              mapOutputFilesOnDisk.add(localFileSys.getFileStatus(filename));
+            }
           }
+
+          // Note that we successfully copied the map-output
+          copiedMapOutputs.add(loc.getTaskId());
+        }
+        
+        return bytes;
+      }
+      
+
+      /**
+       * Get the map output into a local file (either in the inmemory fs or on the 
+       * local fs) from the remote server.
+       * We use the file system so that we generate checksum files on the data.
+       * @param mapOutputLoc map-output to be fetched
+       * @param localFilename the filename to write the data into
+       * @param connectionTimeout number of milliseconds for connection timeout
+       * @param readTimeout number of milliseconds for read timeout
+       * @return the path of the file that got created
+       * @throws IOException when something goes wrong
+       */
+      private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, 
+                                     Path localFilename)
+      throws IOException, InterruptedException {
+        boolean good = false;
+        OutputStream output = null;
+        MapOutput mapOutput = null;
+        
+        try {
+          URLConnection connection = 
+            mapOutputLoc.getOutputLocation().openConnection();
+          InputStream input = getInputStream(connection, DEFAULT_READ_TIMEOUT, 
+                                             STALLED_COPY_TIMEOUT);
+          //We will put a file in memory if it meets certain criteria:
+          //1. The size of the (decompressed) file should be less than 25% of 
+          //    the total inmem fs
+          //2. There is space available in the inmem fs
+          
+          long decompressedLength = 
+            Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));  
+          long compressedLength = 
+            Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
           
-          LOG.info(reduceId + " done copying " + loc.getMapTaskID() +
-                   " output from " + loc.getHost() + ".");
-          //Create a thread to do merges. Synchronize access/update to 
-          //mergeInProgress
-          if (!mergeInProgress && 
-              (inMemFileSys.getPercentUsed() >= MAX_INMEM_FILESYS_USE || 
-               (mergeThreshold > 0 && 
-                inMemFileSys.getNumFiles(MAP_OUTPUT_FILTER) >= 
-                mergeThreshold))&&
-              mergeThrowable == null) {
-            LOG.info(reduceId + " InMemoryFileSystem " + 
-                     inMemFileSys.getUri().toString() +
-                     " is " + inMemFileSys.getPercentUsed() + 
-                     " full. Triggering merge");
-            InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
-                                                          (LocalFileSystem)localFileSys, sorter);
-            m.setName("Thread for merging in memory files");
-            m.setDaemon(true);
-            mergeInProgress = true;
-            m.start();
+          // Check if we can save the map-output in-memory
+          boolean createInMem = ramManager.reserve(decompressedLength); 
+          if (createInMem) {
+            LOG.info("Shuffling " + decompressedLength + " bytes (" + 
+                     compressedLength + " raw bytes) " + 
+                     "into RAM-FS from " + mapOutputLoc.getTaskAttemptId());
+            
+            // Are map-outputs compressed?
+            if (codec != null) {
+              decompressor.reset();
+              input = codec.createInputStream(input, decompressor);
+            }
+
+            output = new DataOutputBuffer((int)decompressedLength);
+          }
+          else {
+            // Find out a suitable location for the output on local-filesystem
+            localFilename = lDirAlloc.getLocalPathForWrite(
+                localFilename.toUri().getPath(), decompressedLength, conf);
+            LOG.info("Shuffling " + decompressedLength + " bytes (" + 
+                     compressedLength + " raw bytes) " + 
+                     "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
+            output = localFileSys.create(localFilename);
+          }
+          
+          long bytesRead = 0;
+          try {  
+            try {
+              byte[] buf = new byte[64 * 1024];
+
+              int n = input.read(buf, 0, buf.length);
+              while (n > 0) {
+                bytesRead += n;
+                shuffleClientMetrics.inputBytes(n);
+                output.write(buf, 0, n);
+
+                // indicate we're making progress
+                reporter.progress();
+                n = input.read(buf, 0, buf.length);
+              }
+              
+              LOG.info("Read " + bytesRead + " bytes from map-output " +
+                       "for " + mapOutputLoc.getTaskAttemptId());
+              
+              mapOutput = 
+                  (createInMem) ? 
+                      new MapOutput(mapOutputLoc.getTaskId(), 
+                                    ((DataOutputBuffer)output).getData()) :
+                      new MapOutput(mapOutputLoc.getTaskId(), conf, 
+                                    localFileSys.makeQualified(localFilename), 
+                                    compressedLength);
+              
+            } finally {
+              output.close();
+            }
+          } finally {
+            input.close();
+          }
+          
+          // Sanity check
+          good = createInMem ? (bytesRead == decompressedLength) : 
+                               (bytesRead == compressedLength);
+          if (!good) {
+            throw new IOException("Incomplete map output received for " +
+                                  mapOutputLoc.getTaskAttemptId() + " from " +
+                                  mapOutputLoc.getOutputLocation() + " (" + 
+                                  bytesRead + " instead of " + 
+                                  decompressedLength + ")"
+                                 );
+          }
+        } finally {
+          if (!good) {
+            try {
+              if (mapOutput != null) {
+                mapOutput.discard();
+              }
+            } catch (Throwable th) {
+              // IGNORED because we are cleaning up
+            }
           }
-          neededOutputs.remove(loc.getMapId());
         }
         
-        // Check if the map output file hits the local file-system by checking 
-        // their schemes
-        String localFSScheme = localFileSys.getUri().getScheme();
-        String outputFileScheme = fs.getUri().getScheme();
-        if (localFSScheme.equals(outputFileScheme)) {
-          synchronized (mapOutputFilesOnDisk) {        
-            mapOutputFilesOnDisk.add(fs.getFileStatus(filename));
+        return mapOutput;
+      }
+
+      /** 
+       * The connection establishment is attempted multiple times and is given up 
+       * only on the last failure. Instead of connecting with a timeout of 
+       * X, we try connecting with a timeout of x < X but multiple times. 
+       */
+      private InputStream getInputStream(URLConnection connection, 
+                                         int connectionTimeout, 
+                                         int readTimeout) 
+      throws IOException {
+        int unit = 0;
+        if (connectionTimeout < 0) {
+          throw new IOException("Invalid timeout "
+                                + "[timeout = " + connectionTimeout + " ms]");
+        } else if (connectionTimeout > 0) {
+          unit = (UNIT_CONNECT_TIMEOUT > connectionTimeout)
+                 ? connectionTimeout
+                 : UNIT_CONNECT_TIMEOUT;
+        }
+        // set the read timeout to the total timeout
+        connection.setReadTimeout(readTimeout);
+        // set the connect timeout to the unit-connect-timeout
+        connection.setConnectTimeout(unit);
+        while (true) {
+          try {
+            return connection.getInputStream();
+          } catch (IOException ioe) {
+            // update the total remaining connect-timeout
+            connectionTimeout -= unit;
+
+            // throw an exception if we have waited for timeout amount of time
+            // note that the updated value if timeout is used here
+            if (connectionTimeout == 0) {
+              throw ioe;
+            }
+
+            // reset the connect timeout for the last try
+            if (connectionTimeout < unit) {
+              unit = connectionTimeout;
+              // reset the connect time out for the final connect
+              connection.setConnectTimeout(unit);
+            }
           }
         }
-        return bytes;
       }
-      
+
     }
     
     private void configureClasspath(JobConf conf)
@@ -978,21 +1151,13 @@
       this.maxFetchRetriesPerMap = getClosestPowerOf2((this.maxBackoff * 1000 
                                                        / BACKOFF_INIT) + 1); 
       this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
+
+      // Setup the RamManager
+      ramManager = new RamManager(conf);
+      ramfsMergeOutputSize = 
+        (long)(MAX_INMEM_FILESYS_USE * ramManager.getMemoryLimit());
       
-      //we want to distinguish inmem fs instances for different reduces. Hence,
-      //append a unique string in the uri for the inmem fs name
-      URI uri = URI.create("ramfs://mapoutput" + reduceTask.hashCode());
-      inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
-      LOG.info(reduceTask.getTaskID() + " Created an InMemoryFileSystem, uri: "
-               + uri);
-      ramfsMergeOutputSize = (long)(MAX_INMEM_FILESYS_USE * 
-                                    inMemFileSys.getFSSize());
       localFileSys = FileSystem.getLocal(conf);
-      //create an instance of the sorter
-      sorter =
-        new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(), 
-            conf.getMapOutputKeyClass(), conf.getMapOutputValueClass(), conf);
-      sorter.setProgressable(getReporter(umbilical));
       
       // hosts -> next contact time
       this.penaltyBox = new LinkedHashMap<String, Long>();
@@ -1012,6 +1177,7 @@
       this.maxMapRuntime = 0;
     }
     
+    @SuppressWarnings("unchecked")
     public boolean fetchOutputs() throws IOException {
       final int      numOutputs = reduceTask.getNumMaps();
       List<MapOutputLocation> knownOutputs = 
@@ -1024,24 +1190,18 @@
         reduceTask.getProgress().phase();
       
       for (int i = 0; i < numOutputs; i++) {
-        neededOutputs.add(i);
         copyPhase.addPhase();       // add sub-phase per file
       }
       
-      copiers = new MapOutputCopier[numCopiers];
+      copiers = new ArrayList<MapOutputCopier>(numCopiers);
       
       Reporter reporter = getReporter(umbilical);
-      // create an instance of the sorter for merging the on-disk files
-      SequenceFile.Sorter localFileSystemSorter = 
-        new SequenceFile.Sorter(localFileSys, conf.getOutputKeyComparator(), 
-                                conf.getMapOutputKeyClass(),
-                                conf.getMapOutputValueClass(), conf);
-      localFileSystemSorter.setProgressable(reporter);
-      
+
       // start all the copying threads
-      for (int i=0; i < copiers.length; i++) {
-        copiers[i] = new MapOutputCopier(reporter);
-        copiers[i].start();
+      for (int i=0; i < numCopiers; i++) {
+        MapOutputCopier copier = new MapOutputCopier(conf, reporter);
+        copiers.add(copier);
+        copier.start();
       }
       
       // start the clock for bandwidth measurement
@@ -1051,9 +1211,8 @@
       long lastOutputTime = 0;
       IntWritable fromEventId = new IntWritable(0);
       
-      try {
         // loop until we get all required outputs
-        while (!neededOutputs.isEmpty() && mergeThrowable == null) {
+        while (copiedMapOutputs.size() < numOutputs && mergeThrowable == null) {
           
           currentTime = System.currentTimeMillis();
           boolean logNow = false;
@@ -1063,8 +1222,8 @@
           }
           if (logNow) {
             LOG.info(reduceTask.getTaskID() + " Need another " 
-                   + neededOutputs.size() + " map output(s) where " 
-                   + numInFlight + " is already in progress");
+                   + (numOutputs - copiedMapOutputs.size()) + " map output(s) "
+                   + "where " + numInFlight + " is already in progress");
           }
           
           try {
@@ -1129,7 +1288,7 @@
               MapOutputLocation loc = locIt.next();
               
               // Do not schedule fetches from OBSOLETE maps
-              if (obsoleteMapIds.contains(loc.getMapTaskID())) {
+              if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
                 locIt.remove();
                 continue;
               }
@@ -1178,9 +1337,11 @@
               // make sure that only one thread merges the disk files
               localFSMergeInProgress = true;
               // start the on-disk-merge process
+              LOG.info(reduceTask.getTaskID() + "We have  " + 
+                       mapOutputFilesOnDisk.size() + " map outputs on disk. " +
+                       "Triggering merge of " + ioSortFactor + " files");
               LocalFSMerger lfsm =  
-                new LocalFSMerger((LocalFileSystem)localFileSys, 
-                                  localFileSystemSorter);
+                new LocalFSMerger((LocalFileSystem)localFileSys);
               lfsm.setName("Thread for merging on-disk files");
               lfsm.setDaemon(true);
               lfsm.start();
@@ -1221,19 +1382,19 @@
                 
                 // Note successfull fetch for this mapId to invalidate
                 // (possibly) old fetch-failures
-                fetchFailedMaps.remove(cr.getLocation().getMapId());
+                fetchFailedMaps.remove(cr.getLocation().getTaskId());
               } else if (cr.isObsolete()) {
                 //ignore
                 LOG.info(reduceTask.getTaskID() + 
                          " Ignoring obsolete copy result for Map Task: " + 
-                         cr.getLocation().getMapTaskID() + " from host: " + 
+                         cr.getLocation().getTaskAttemptId() + " from host: " + 
                          cr.getHost());
               } else {
                 retryFetches.add(cr.getLocation());
                 
                 // note the failed-fetch
-                TaskAttemptID mapTaskId = cr.getLocation().getMapTaskID();
-                Integer mapId = cr.getLocation().getMapId();
+                TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
+                TaskID mapId = cr.getLocation().getTaskId();
                 
                 totalFailures++;
                 Integer noFailedFetches = 
@@ -1351,10 +1512,10 @@
         // all done, inform the copiers to exit
         synchronized (copiers) {
           synchronized (scheduledCopies) {
-            for (int i=0; i < copiers.length; i++) {
-              copiers[i].interrupt();
-              copiers[i] = null;
+            for (MapOutputCopier copier : copiers) {
+              copier.interrupt();
             }
+            copiers.clear();
           }
         }
         
@@ -1372,52 +1533,63 @@
             }
             LOG.info(reduceTask.getTaskID() + 
                      " Copying of all map outputs complete. " + 
-                     "Initiating the last merge on the remaining files in " + 
-                     inMemFileSys.getUri());
+                     "Initiating the last merge on the remaining files " +
+                     "in-memory");
             if (mergeThrowable != null) {
               //this could happen if the merge that
               //was in progress threw an exception
               throw mergeThrowable;
             }
             //initiate merge
-            Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
-            if (inMemClosedFiles.length == 0) {
-              LOG.info(reduceTask.getTaskID() + "Nothing to merge from " + 
-                       inMemFileSys.getUri());
-              return neededOutputs.isEmpty();
+            if (mapOutputsFilesInMemory.size() == 0) {
+              LOG.info(reduceTask.getTaskID() + "Nothing to merge from " +
+              		     "in-memory map-outputs");
+              return (copiedMapOutputs.size() == numOutputs);
             }
             //name this output file same as the name of the first file that is 
             //there in the current list of inmem files (this is guaranteed to be
             //absent on the disk currently. So we don't overwrite a prev. 
             //created spill). Also we need to create the output file now since
             //it is not guaranteed that this file will be present after merge
-            //is called (we delete empty sequence files as soon as we see them
+            //is called (we delete empty map-output files as soon as we see them
             //in the merge method)
-            int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
-            Path outputPath = mapOutputFile.getInputFileForWrite(mapId, 
-                             reduceTask.getTaskID(), ramfsMergeOutputSize);
-            SequenceFile.Writer writer = sorter.cloneFileAttributes(
-                                                                    inMemFileSys.makeQualified(inMemClosedFiles[0]), 
-                                                                    localFileSys.makeQualified(outputPath), null);
-            
-            SequenceFile.Sorter.RawKeyValueIterator rIter = null;
+            TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
+            Path outputPath = 
+              localFileSys.makeQualified(
+                  mapOutputFile.getInputFileForWrite(mapId, 
+                                                     reduceTask.getTaskID(), 
+                                                     ramfsMergeOutputSize));
+            Writer writer = 
+              new Writer(conf, localFileSys, outputPath,
+                         conf.getMapOutputKeyClass(), 
+                         conf.getMapOutputValueClass(),
+                         codec);
+            List<Segment<K, V>> inMemorySegments = createInMemorySegments();
+            int noInMemSegments = inMemorySegments.size();
+            RawKeyValueIterator rIter = null;
             try {
-              rIter = sorter.merge(inMemClosedFiles, true, 
-                                   inMemClosedFiles.length, 
-                                   new Path(reduceTask.getTaskID().toString()));
+              rIter = Merger.merge(conf, localFileSys,
+                                   (Class<K>)conf.getMapOutputKeyClass(), 
+                                   (Class<V>)conf.getMapOutputValueClass(),
+                                   inMemorySegments, inMemorySegments.size(), 
+                                   new Path(reduceTask.getTaskID().toString()), 
+                                   conf.getOutputKeyComparator(), reporter); 
+
+              Merger.writeFile(rIter, writer, reporter);
+              writer.close();
             } catch (Exception e) { 
               //make sure that we delete the ondisk file that we created earlier
               //when we invoked cloneFileAttributes
               writer.close();
-              localFileSys.delete(inMemClosedFiles[0], true);
+              localFileSys.delete(outputPath, true);
               throw new IOException (StringUtils.stringifyException(e));
             }
-            sorter.writeFile(rIter, writer);
-            writer.close();
             LOG.info(reduceTask.getTaskID() +
-                     " Merge of the " +inMemClosedFiles.length +
+                     " Merge of the " + noInMemSegments +
                      " files in InMemoryFileSystem complete." +
-                     " Local file is " + outputPath);
+                     " Local file is " + outputPath +
+                     " of size " + 
+                     localFileSys.getFileStatus(outputPath).getLen());
             
             FileStatus status = localFileSys.getFileStatus(outputPath);
             synchronized (mapOutputFilesOnDisk) {
@@ -1434,12 +1606,26 @@
             return false;
           }
         }
-        return mergeThrowable == null && neededOutputs.isEmpty();
-      } finally {
-        inMemFileSys.close();
-      }
+        return mergeThrowable == null && copiedMapOutputs.size() == numOutputs;
     }
     
+    private List<Segment<K, V>> createInMemorySegments() {
+      List<Segment<K, V>> inMemorySegments = 
+        new LinkedList<Segment<K, V>>();
+      synchronized (mapOutputsFilesInMemory) {
+        while(mapOutputsFilesInMemory.size() > 0) {
+          MapOutput mo = mapOutputsFilesInMemory.remove(0);
+          
+          Reader<K, V> reader = 
+            new InMemoryReader<K, V>(ramManager, 
+                                     mo.data, 0, mo.data.length);
+          Segment<K, V> segment = 
+            new Segment<K, V>(reader, true);
+          inMemorySegments.add(segment);
+        }
+      }
+      return inMemorySegments;
+    }
     
     private CopyResult getCopyResult() {  
       synchronized (copyResults) {
@@ -1501,9 +1687,7 @@
           {
             URI u = URI.create(event.getTaskTrackerHttp());
             String host = u.getHost();
-            int port = u.getPort();
-            TaskAttemptID taskId = event.getTaskID();
-            int mId = event.idWithinJob();
+            TaskAttemptID taskId = event.getTaskAttemptId();
             int duration = event.getTaskRunTime();
             if (duration > maxMapRuntime) {
               maxMapRuntime = duration; 
@@ -1511,23 +1695,28 @@
               maxFetchRetriesPerMap = 
                   getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1);
             }
-            knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
+            URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + 
+                                    "/mapOutput?job=" + taskId.getJobID() +
+                                    "&map=" + taskId + 
+                                    "&reduce=" + getPartition());
+            knownOutputs.add(new MapOutputLocation(taskId, host, 
+                                                   mapOutputLocation));
           }
           break;
           case FAILED:
           case KILLED:
           case OBSOLETE:
           {
-            obsoleteMapIds.add(event.getTaskID());
+            obsoleteMapIds.add(event.getTaskAttemptId());
             LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + 
-                     " map-task: '" + event.getTaskID() + "'");
+                     " map-task: '" + event.getTaskAttemptId() + "'");
           }
           break;
           case TIPFAILED:
           {
-            neededOutputs.remove(event.idWithinJob());
+            copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
             LOG.info("Ignoring output of failed map TIP: '" +  
-            		 event.getTaskID() + "'");
+            		 event.getTaskAttemptId() + "'");
           }
           break;
         }
@@ -1542,17 +1731,15 @@
      */
     private class LocalFSMerger extends Thread {
       private LocalFileSystem localFileSys;
-      private SequenceFile.Sorter sorter;
 
-      public LocalFSMerger(LocalFileSystem fs, SequenceFile.Sorter sorter) {
+      public LocalFSMerger(LocalFileSystem fs) {
         this.localFileSys = fs;
-        this.sorter = sorter;
       }
 
-      @Override
+      @SuppressWarnings("unchecked")
       public void run() {
         try {
-          Path[] mapFiles = new Path[ioSortFactor];
+          List<Path> mapFiles = new ArrayList<Path>();
           long approxOutputSize = 0;
           int bytesPerSum = 
             reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
@@ -1565,10 +1752,16 @@
             for (int i = 0; i < ioSortFactor; ++i) {
               FileStatus filestatus = mapOutputFilesOnDisk.first();
               mapOutputFilesOnDisk.remove(filestatus);
-              mapFiles[i] = filestatus.getPath();
+              mapFiles.add(filestatus.getPath());
               approxOutputSize += filestatus.getLen();
             }
           }
+          
+          // sanity check
+          if (mapFiles.size() == 0) {
+              return;
+          }
+          
           // add the checksum length
           approxOutputSize += ChecksumFileSystem
                               .getChecksumLength(approxOutputSize,
@@ -1576,29 +1769,42 @@
 
           // 2. Start the on-disk merge process
           Path outputPath = 
-            lDirAlloc.getLocalPathForWrite(mapFiles[0].toString(), 
+            lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(), 
                                            approxOutputSize, conf)
             .suffix(".merged");
-          SequenceFile.Writer writer =
-            sorter.cloneFileAttributes(mapFiles[0], outputPath, null);
-          SequenceFile.Sorter.RawKeyValueIterator iter  = null;
+          Writer writer = 
+            new Writer(conf, localFileSys, outputPath, 
+                       conf.getMapOutputKeyClass(), 
+                       conf.getMapOutputValueClass(),
+                       codec);
+          RawKeyValueIterator iter  = null;
           Path tmpDir = new Path(reduceTask.getTaskID().toString());
+          final Reporter reporter = getReporter(umbilical);
           try {
-            iter = sorter.merge(mapFiles, true, ioSortFactor, tmpDir);
+            iter = Merger.merge(conf, localFileSys,
+                                conf.getMapOutputKeyClass(),
+                                conf.getMapOutputValueClass(),
+                                codec, mapFiles.toArray(new Path[mapFiles.size()]), 
+                                true, ioSortFactor, tmpDir, 
+                                conf.getOutputKeyComparator(), reporter);
           } catch (Exception e) {
             writer.close();
             localFileSys.delete(outputPath, true);
             throw new IOException (StringUtils.stringifyException(e));
           }
-          sorter.writeFile(iter, writer);
+          Merger.writeFile(iter, writer, reporter);
           writer.close();
           
           synchronized (mapOutputFilesOnDisk) {
             mapOutputFilesOnDisk.add(localFileSys.getFileStatus(outputPath));
           }
           
-          LOG.info(reduceTask.getTaskID()
-                   + " Finished merging map output files on disk.");
+          LOG.info(reduceTask.getTaskID() +
+                   " Finished merging " + mapFiles.size() + 
+                   " map output files on disk of total-size " + 
+                   approxOutputSize + "." + 
+                   " Local output file is " + outputPath + " of size " +
+                   localFileSys.getFileStatus(outputPath).getLen());
         } catch (Throwable t) {
           LOG.warn(reduceTask.getTaskID()
                    + " Merging of the local FS files threw an exception: "
@@ -1613,54 +1819,52 @@
     }
 
     private class InMemFSMergeThread extends Thread {
-      private InMemoryFileSystem inMemFileSys;
       private LocalFileSystem localFileSys;
-      private SequenceFile.Sorter sorter;
       
-      public InMemFSMergeThread(InMemoryFileSystem inMemFileSys, 
-                                LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
-        this.inMemFileSys = inMemFileSys;
+      public InMemFSMergeThread( LocalFileSystem localFileSys) {
         this.localFileSys = localFileSys;
-        this.sorter = sorter;
       }
-      @Override
+      
+      @SuppressWarnings("unchecked")
       public void run() {
         LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
         try {
-          Path[] inMemClosedFiles;
-          //initiate merge
-          synchronized (ReduceTask.this) {
-            inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
-          }
-          //Note that the above Path[] could be of length 0 if all copies are 
-          //in flight. So we make sure that we have some 'closed' map
-          //output files to merge to get the benefit of in-memory merge
-          if (inMemClosedFiles.length >= 
-              (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
+          if (mapOutputsFilesInMemory.size() >= 
+                 (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
             //name this output file same as the name of the first file that is 
             //there in the current list of inmem files (this is guaranteed to
             //be absent on the disk currently. So we don't overwrite a prev. 
             //created spill). Also we need to create the output file now since
             //it is not guaranteed that this file will be present after merge
-            //is called (we delete empty sequence files as soon as we see them
+            //is called (we delete empty files as soon as we see them
             //in the merge method)
 
             //figure out the mapId 
-            int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
+            TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
             
             Path outputPath = mapOutputFile.getInputFileForWrite(mapId, 
                               reduceTask.getTaskID(), ramfsMergeOutputSize);
 
-            SequenceFile.Writer writer = sorter.cloneFileAttributes(
-                                                                    inMemFileSys.makeQualified(inMemClosedFiles[0]), 
-                                                                    localFileSys.makeQualified(outputPath), null);
-            SequenceFile.Sorter.RawKeyValueIterator rIter;
+            Writer writer = 
+              new Writer(conf, localFileSys, outputPath,
+                         conf.getMapOutputKeyClass(),
+                         conf.getMapOutputValueClass(),
+                         codec);
+            
+            List<Segment<K, V>> inMemorySegments = createInMemorySegments();
+            int noInMemorySegments = inMemorySegments.size();
+            
+            RawKeyValueIterator rIter = null;
+            final Reporter reporter = getReporter(umbilical);
             try {
-              rIter = sorter.merge(inMemClosedFiles, true, 
-                                   inMemClosedFiles.length, 
-                                   new Path(reduceTask.getTaskID().toString()));
+              rIter = Merger.merge(conf, localFileSys,
+                                   (Class<K>)conf.getMapOutputKeyClass(),
+                                   (Class<V>)conf.getMapOutputValueClass(),
+                                   inMemorySegments, inMemorySegments.size(),
+                                   new Path(reduceTask.getTaskID().toString()),
+                                   conf.getOutputKeyComparator(), reporter);
               if (null == combinerClass) {
-                sorter.writeFile(rIter, writer);
+                Merger.writeFile(rIter, writer, reporter);
               } else {
                 combineCollector.setWriter(writer);
                 combineAndSpill(rIter, reduceCombineInputCounter);
@@ -1675,9 +1879,10 @@
             }
             writer.close();
             LOG.info(reduceTask.getTaskID() + 
-                     " Merge of the " +inMemClosedFiles.length +
-                     " files in InMemoryFileSystem complete." +
-                     " Local file is " + outputPath);
+                     " Merge of the " + noInMemorySegments +
+                     " files in-memory complete." +
+                     " Local file is " + outputPath + " of size " + 
+                     localFileSys.getFileStatus(outputPath).getLen());
             
             FileStatus status = localFileSys.getFileStatus(outputPath);
             synchronized (mapOutputFilesOnDisk) {
@@ -1685,8 +1890,7 @@
             }
           }
           else {
-            LOG.info(reduceTask.getTaskID() + " Nothing to merge from " + 
-                     inMemFileSys.getUri());
+            LOG.info(reduceTask.getTaskID() + " Nothing to merge from map-outputs in-memory");
           }
         } catch (Throwable t) {
           LOG.warn(reduceTask.getTaskID() +
@@ -1699,15 +1903,10 @@
         }
       }
     }
-    final private PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
-        public boolean accept(Path file) {
-          return file.toString().endsWith(".out");
-        }     
-      };
 
     @SuppressWarnings("unchecked")
     private void combineAndSpill(
-        SequenceFile.Sorter.RawKeyValueIterator kvIter,
+        RawKeyValueIterator kvIter,
         Counters.Counter inCounter) throws IOException {
       JobConf job = (JobConf)getConf();
       Reducer combiner =

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Jun  4 21:06:13 2008
@@ -24,7 +24,9 @@
 import java.net.URI;
 import java.text.NumberFormat;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -40,13 +42,16 @@
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.kfs.KosmosFileSystem;
 import org.apache.hadoop.fs.s3.S3FileSystem;
+import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -602,28 +607,141 @@
   /**
    * OutputCollector for the combiner.
    */
-  protected static class CombineOutputCollector implements OutputCollector {
-    private SequenceFile.Writer writer;
+  protected static class CombineOutputCollector<K extends Object, V extends Object> 
+  implements OutputCollector<K, V> {
+    private Writer<K, V> writer;
     private Counters.Counter outCounter;
     public CombineOutputCollector(Counters.Counter outCounter) {
       this.outCounter = outCounter;
     }
-    public synchronized void setWriter(SequenceFile.Writer writer) {
+    public synchronized void setWriter(Writer<K, V> writer) {
       this.writer = writer;
     }
-    public synchronized void collect(Object key, Object value)
+    public synchronized void collect(K key, V value)
         throws IOException {
       outCounter.increment(1);
       writer.append(key, value);
     }
   }
 
+  /** Iterates values while keys match in sorted input. */
+  static class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
+    protected RawKeyValueIterator in; //input iterator
+    private KEY key;               // current key
+    private KEY nextKey;
+    private VALUE value;             // current value
+    private boolean hasNext;                      // more w/ this key
+    private boolean more;                         // more in file
+    private RawComparator<KEY> comparator;
+    protected Progressable reporter;
+    private Deserializer<KEY> keyDeserializer;
+    private Deserializer<VALUE> valDeserializer;
+    private DataInputBuffer keyIn = new DataInputBuffer();
+    private DataInputBuffer valueIn = new DataInputBuffer();
+    
+    @SuppressWarnings("unchecked")
+    public ValuesIterator (RawKeyValueIterator in, 
+                           RawComparator<KEY> comparator, 
+                           Class<KEY> keyClass,
+                           Class<VALUE> valClass, Configuration conf, 
+                           Progressable reporter)
+      throws IOException {
+      this.in = in;
+      this.comparator = comparator;
+      this.reporter = reporter;
+      SerializationFactory serializationFactory = new SerializationFactory(conf);
+      this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+      this.keyDeserializer.open(keyIn);
+      this.valDeserializer = serializationFactory.getDeserializer(valClass);
+      this.valDeserializer.open(this.valueIn);
+      readNextKey();
+      key = nextKey;
+      nextKey = null; // force new instance creation
+      hasNext = more;
+    }
+
+    RawKeyValueIterator getRawIterator() { return in; }
+    
+    /// Iterator methods
+
+    public boolean hasNext() { return hasNext; }
+
+    private int ctr = 0;
+    public VALUE next() {
+      if (!hasNext) {
+        throw new NoSuchElementException("iterate past last value");
+      }
+      try {
+        readNextValue();
+        readNextKey();
+      } catch (IOException ie) {
+        throw new RuntimeException("problem advancing post rec#"+ctr, ie);
+      }
+      reporter.progress();
+      return value;
+    }
+
+    public void remove() { throw new RuntimeException("not implemented"); }
+
+    /// Auxiliary methods
+
+    /** Start processing next unique key. */
+    void nextKey() throws IOException {
+      // read until we find a new key
+      while (hasNext) { 
+        readNextKey();
+      }
+      ++ctr;
+      
+      // move the next key to the current one
+      KEY tmpKey = key;
+      key = nextKey;
+      nextKey = tmpKey;
+      hasNext = more;
+    }
+
+    /** True iff more keys remain. */
+    boolean more() { 
+      return more; 
+    }
+
+    /** The current key. */
+    KEY getKey() { 
+      return key; 
+    }
+
+    /** 
+     * read the next key 
+     */
+    private void readNextKey() throws IOException {
+      more = in.next();
+      if (more) {
+        DataInputBuffer nextKeyBytes = in.getKey();
+        keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+        nextKey = keyDeserializer.deserialize(nextKey);
+        hasNext = key != null && (comparator.compare(key, nextKey) == 0);
+      } else {
+        hasNext = false;
+      }
+    }
+
+    /**
+     * Read the next value
+     * @throws IOException
+     */
+    private void readNextValue() throws IOException {
+      DataInputBuffer nextValueBytes = in.getValue();
+      valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+      value = valDeserializer.deserialize(value);
+    }
+  }
+
   protected static class CombineValuesIterator<KEY,VALUE>
       extends ValuesIterator<KEY,VALUE> {
 
     private final Counters.Counter combineInputCounter;
 
-    public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
+    public CombineValuesIterator(RawKeyValueIterator in,
         RawComparator<KEY> comparator, Class<KEY> keyClass,
         Class<VALUE> valClass, Configuration conf, Reporter reporter,
         Counters.Counter combineInputCounter) throws IOException {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Wed Jun  4 21:06:13 2008
@@ -79,7 +79,7 @@
   /**
    * Returns task id. 
    * @return task id
-   * @deprecated use {@link #getTaskID()} instead.
+   * @deprecated use {@link #getTaskAttemptId()} instead.
    */
   @Deprecated
   public String getTaskId() {
@@ -90,7 +90,7 @@
    * Returns task id. 
    * @return task id
    */
-  public TaskAttemptID getTaskID() {
+  public TaskAttemptID getTaskAttemptId() {
     return taskId;
   }
   

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jun  4 21:06:13 2008
@@ -2346,18 +2346,24 @@
         indexIn = fileSys.open(indexFileName);
 
         //seek to the correct offset for the given reduce
-        indexIn.seek(reduce * 16);
+        indexIn.seek(reduce * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
           
         //read the offset and length of the partition data
         long startOffset = indexIn.readLong();
+        long rawPartLength = indexIn.readLong();
         long partLength = indexIn.readLong();
 
         indexIn.close();
         indexIn = null;
           
+        //set the custom "Raw-Map-Output-Length" http header to 
+        //the raw (decompressed) length
+        response.setHeader(RAW_MAP_OUTPUT_LENGTH, Long.toString(rawPartLength));
+
         //set the custom "Map-Output-Length" http header to 
         //the actual number of bytes being transferred
-        response.setHeader(MAP_OUTPUT_LENGTH, Long.toString(partLength));
+        response.setHeader(MAP_OUTPUT_LENGTH, 
+                           Long.toString(partLength));
 
         //use the same buffersize as used for reading the data from disk
         response.setBufferSize(MAX_BYTES_TO_READ);
@@ -2390,6 +2396,10 @@
                                  (partLength - totalRead) < MAX_BYTES_TO_READ
                                  ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
         }
+        
+        LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
+                 " from map: " + mapId + " given " + partLength + "/" + 
+                 rawPartLength);
       } catch (IOException ie) {
         TaskTracker tracker = 
           (TaskTracker) context.getAttribute("task.tracker");

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java Wed Jun  4 21:06:13 2008
@@ -171,6 +171,17 @@
     }
   }
 
+  /**
+   * Return the correctly-typed {@link Class} of the given object.
+   *  
+   * @param o object whose correctly-typed <code>Class</code> is to be obtained
+   * @return the correctly typed <code>Class</code> of the given object.
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> Class<T> getClass(T o) {
+    return (Class<T>)o.getClass();
+  }
+  
   // methods to support testing
   static void clearCache() {
     CONSTRUCTOR_CACHE.clear();

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java Wed Jun  4 21:06:13 2008
@@ -45,7 +45,7 @@
       return null;
     }
     
-    public Class getCompressorType() {
+    public Class<? extends Compressor> getCompressorType() {
       return null;
     }
 
@@ -70,7 +70,7 @@
       return null;
     }
 
-    public Class getDecompressorType() {
+    public Class<? extends Decompressor> getDecompressorType() {
       return null;
     }
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java Wed Jun  4 21:06:13 2008
@@ -98,7 +98,7 @@
     TaskCompletionEvent[] events1 = rj1.getTaskCompletionEvents(0);
     assertEquals(events0.length, events1.length);    
     for (int i = 0; i < events0.length; i++) {
-      assertEquals(events0[i].getTaskID(), events1[i].getTaskID());
+      assertEquals(events0[i].getTaskAttemptId(), events1[i].getTaskAttemptId());
       assertEquals(events0[i].getTaskStatus(), events1[i].getTaskStatus());
     }
   }