You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/11/10 22:11:56 UTC

svn commit: r473481 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/io/SequenceFile.java

Author: cutting
Date: Fri Nov 10 13:11:55 2006
New Revision: 473481

URL: http://svn.apache.org/viewvc?view=rev&rev=473481
Log:
HADOOP-611.  Add support for iterator-based merging to SequenceFile.  Contributed by Devaraj.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=473481&r1=473480&r2=473481
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Nov 10 13:11:55 2006
@@ -41,6 +41,9 @@
 12. HADOOP-696.  Fix TestTextInputFormat unit test to not rely on the
     order of directory listings.  (Sameer Paranjpye via cutting)
 
+13. HADOOP-611.  Add support for iterator-based merging to
+    SequenceFile.  (Devaraj Das via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=473481&r1=473480&r2=473481
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Fri Nov 10 13:11:55 2006
@@ -853,6 +853,7 @@
 
     private long end;
     private int keyLength;
+    private int recordLength;
 
     private boolean decompress;
     private boolean blockCompressed;
@@ -989,9 +990,6 @@
         valLenInFilter = this.codec.createInputStream(valLenBuffer);
         valLenIn = new DataInputStream(valLenInFilter);
       }
-      
-
-      lazyDecompress = conf.getBoolean("io.seqfile.lazydecompress", true);
     }
     
     /** Close the file. */
@@ -1323,6 +1321,82 @@
       
     }
 
+    /**
+     * Read 'raw' keys.
+     * @param key - The buffer into which the key is read
+     * @return Returns the key length
+     * @throws IOException
+     */
+    public int nextRawKey(DataOutputBuffer key) 
+    throws IOException {
+      if (!blockCompressed) {
+        if (in.getPos() >= end) 
+          return -1;
+
+        recordLength = checkAndReadSync(in.readInt());
+        keyLength = in.readInt();
+        key.write(in, keyLength);
+        return keyLength;
+      } else {
+        //Reset syncSeen
+        syncSeen = false;
+        
+        // Read 'key'
+        if (noBufferedKeys == 0) {
+          if (in.getPos() >= end) 
+            return -1;
+
+          try { 
+            readBlock();
+          } catch (EOFException eof) {
+            return -1;
+          }
+        }
+        int keyLength = WritableUtils.readVInt(keyLenIn);
+        if (keyLength < 0) {
+          throw new IOException("zero length key found!");
+        }
+        key.write(keyIn, keyLength);
+        --noBufferedKeys;
+        
+        return keyLength;
+      }
+      
+    }
+
+    /**
+     * Read 'raw' values.
+     * @param val - The 'raw' value
+     * @return Returns the value length
+     * @throws IOException
+     */
+    public int nextRawValue(ValueBytes val) 
+    throws IOException {
+      
+      // Position stream to current value
+      seekToCurrentValue();
+ 
+      if (!blockCompressed) {
+        int valLength = recordLength - keyLength;
+        if (decompress) {
+          CompressedBytes value = (CompressedBytes)val;
+          value.reset(in, valLength);
+        } else {
+          UncompressedBytes value = (UncompressedBytes)val;
+          value.reset(in, valLength);
+        }
+         
+        return valLength;
+      } else {
+        int valLength = WritableUtils.readVInt(valLenIn);
+        UncompressedBytes rawValue = (UncompressedBytes)val;
+        rawValue.reset(valIn, valLength);
+        --noBufferedValues;
+        return valLength;
+      }
+      
+    }
+
     private void handleChecksumException(ChecksumException e)
       throws IOException {
       if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
@@ -1459,16 +1533,8 @@
       this.outFile = outFile;
 
       int segments = sortPass(deleteInput);
-      int pass = 1;
-      while (segments > 1) {
-        segments = mergePass(pass, segments <= factor);
-        pass++;
-      }
-      
-      // Clean up intermediate files
-      for (int i=0; i < pass; ++i) {
-        fs.delete(new Path(outFile.toString() + "." + i));
-        fs.delete(new Path(outFile.toString() + "." + i + ".index"));
+      if (segments > 1) {
+        segments = mergePass();
       }
     }
 
@@ -1712,128 +1778,377 @@
       }
     } // SequenceFile.Sorter.SortPass
 
-    private int mergePass(int pass, boolean last) throws IOException {
-      LOG.debug("running merge pass=" + pass);
-      MergePass mergePass = new MergePass(pass, last);
-      try {                                       // make a merge pass
-        return mergePass.run();                  // run it
-      } finally {
-        mergePass.close();                       // close it
-      }
+    /** The interface to iterate over raw keys/values of SequenceFiles. */
+    public static interface RawKeyValueIterator {
+      /** Gets the current raw key
+       * @return DataOutputBuffer
+       * @throws IOException
+       */
+      DataOutputBuffer getKey() throws IOException; 
+      /** Gets the current raw value
+       * @return ValueBytes 
+       * @throws IOException
+       */
+      ValueBytes getValue() throws IOException; 
+      /** Sets up the current key and value (for getKey and getValue)
+       * @return true if there exists a key/value, false otherwise 
+       * @throws IOException
+       */
+      boolean next() throws IOException;
+      /** closes the iterator so that the underlying streams can be closed
+       * @throws IOException
+       */
+      void close() throws IOException;
+    }    
+    
+  /**
+   * Merges the list of segments of type <code>SegmentDescriptor</code>
+   * @param segments the list of SegmentDescriptors
+   * @return RawKeyValueIterator
+   * @throws IOException
+   */
+    public RawKeyValueIterator merge(List <SegmentDescriptor> segments) 
+    throws IOException {
+      MergeQueue mQueue = new MergeQueue(segments);
+      return mQueue.merge();
     }
 
-    private class MergePass {
-      private boolean last;
-
-      private MergeQueue queue;
-      private FSDataInputStream in = null;
-      private Path inName;
-      private FSDataInputStream indexIn = null;
-
-      public MergePass(int pass, boolean last) throws IOException {
-        this.last = last;
-
-        this.queue =
-          new MergeQueue(factor, last?outFile:outFile.suffix("."+pass), last);
-
-        this.inName = outFile.suffix("."+(pass-1));
-        this.in = fs.open(inName);
-        this.indexIn = fs.open(inName.suffix(".index"));
-      }
+    /**
+     * Merges the contents of files passed in Path[]
+     * @param inNames the array of path names
+     * @param deleteInputs true if the input files should be deleted when 
+     * unnecessary
+     * @return RawKeyValueIterator
+     * @throws IOException
+     */
+    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs) 
+    throws IOException {
+      //get the segments from inNames
+      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
+      for (int i = 0; i < inNames.length; i++) {
+        SegmentDescriptor s = new SegmentDescriptor(0, 
+                              fs.getLength(inNames[i]), inNames[i]);
+        s.preserveInput(!deleteInputs);
+        s.doSync();
+        a.add(s);
+      }
+      factor = inNames.length;
+      MergeQueue mQueue = new MergeQueue(a);
+      return mQueue.merge();
+    }
 
-      public void close() throws IOException {
-        in.close();                               // close and delete input
-        fs.delete(inName);
+    /**
+     * Clones the attributes (like compression of the input file and creates a 
+     * corresponding Writer
+     * @param FileSystem
+     * @param inputFile the path of the input file whose attributes should be 
+     * cloned 
+     * @param outputFile the path of the output file 
+     * @param prog the Progressable to report status during the file write
+     * @return Writer
+     * @throws IOException
+     */
+    public Writer cloneFileAttributes(FileSystem fileSys, Path inputFile, 
+                  Path outputFile, Progressable prog) throws IOException {
+      Reader reader = new Reader(fileSys, inputFile, memory/(factor+1), conf);
+      boolean compress = reader.isCompressed();
+      boolean blockCompress = reader.isBlockCompressed();
+      CompressionCodec codec = reader.getCompressionCodec();
+      reader.close();
+      FSDataOutputStream out;
+      if (prog != null)
+        out = fs.create(outputFile, true, memory/(factor+1), prog);
+      else
+        out = fs.create(outputFile, true, memory/(factor+1));
+      Writer writer = createWriter(out, keyClass, valClass, compress, 
+                          blockCompress, codec);
+      return writer;
+    }
 
-        queue.close();                            // close queue
+    /**
+     * Writes records from RawKeyValueIterator into a file represented by the 
+     * passed writer
+     * @param records the RawKeyValueIterator
+     * @param writer the Writer created earlier 
+     * @throws IOException
+     */
+    public void writeFile(RawKeyValueIterator records, Writer writer) 
+    throws IOException {
+      while(records.next()) {
+        writer.appendRaw(records.getKey().getData(), 0, 
+                         records.getKey().getLength(), records.getValue());
       }
-
-      public int run() throws IOException {
-        int segments = 0;
-        long end = fs.getLength(inName);
-
-        while (in.getPos() < end) {
-          LOG.debug("merging segment " + segments);
-          long segmentStart = queue.out.getPos();
-          while (in.getPos() < end && queue.size() < factor) {
-            long segmentOffset = WritableUtils.readVLong(indexIn);
-            long segmentLength = WritableUtils.readVLong(indexIn);
-            Reader reader = new Reader(fs, inName, memory/(factor+1),
-                                        segmentOffset, segmentLength, conf);
-            reader.sync = null;                   // disable sync on temp files
-
-            MergeStream ms = new MergeStream(reader); // add segment to queue
-            if (ms.next()) {
-              queue.put(ms);
-            }
-            in.seek(reader.end);
-          }
-
-          queue.merge();                          // do a merge
-
-          if (!last) {
-            WritableUtils.writeVLong(queue.indexOut, segmentStart);
-            WritableUtils.writeVLong(queue.indexOut, 
-                (queue.out.getPos() - segmentStart));
-          }
-          
-          segments++;
-        }
-
-        return segments;
+      if (writer instanceof SequenceFile.BlockCompressWriter) {
+        SequenceFile.BlockCompressWriter bcWriter =
+                        (SequenceFile.BlockCompressWriter) writer;
+        bcWriter.writeBlock();
       }
-    } // SequenceFile.Sorter.MergePass
-
-    /** Merge the provided files.*/
+    }
+        
+    /** Merge the provided files.
+     * @param inFiles the array of input path names
+     * @param outFile the final output file
+     * @throws IOException
+     */
     public void merge(Path[] inFiles, Path outFile) throws IOException {
-      this.inFiles = inFiles;
-      this.outFile = outFile;
-      this.factor = inFiles.length;
-
       if (fs.exists(outFile)) {
         throw new IOException("already exists: " + outFile);
       }
+      RawKeyValueIterator r = merge(inFiles, false);
+      Writer writer = cloneFileAttributes(fs, 
+              inFiles[0], outFile, null);
+      
+      writeFile(r, writer);
 
-      MergeFiles mergeFiles = new MergeFiles();
-      try {                                       // make a merge pass
-        mergeFiles.run();                         // run it
-      } finally {
-        mergeFiles.close();                       // close it
-      }
+      writer.close();
     }
 
-    private class MergeFiles {
-      private MergeQueue queue;
+    /** sort calls this to generate the final merged output */
+    private int mergePass() throws IOException {
+      LOG.debug("running merge pass");
+      Writer writer = cloneFileAttributes(fs, 
+              outFile.suffix(".0"), outFile, null);
+      RawKeyValueIterator r = merge(outFile.suffix(".0"), 
+                                    outFile.suffix(".0.index"));
+      writeFile(r, writer);
 
-      public MergeFiles() throws IOException {
-        this.queue = new MergeQueue(factor, outFile, true);
-      }
+      writer.close();
+      return 0;
+    }
 
+    /** Used by mergePass to merge the output of the sort
+     * @param inName the name of the input file containing sorted segments
+     * @param indexIn the offsets of the sorted segments
+     * @return RawKeyValueIterator
+     * @throws IOException
+     */
+    private RawKeyValueIterator merge(Path inName, Path indexIn) 
+    throws IOException {
+      //get the segments from indexIn
+      //we create a SegmentContainer so that we can track segments belonging to
+      //inName and delete inName as soon as we see that we have looked at all
+      //the contained segments during the merge process & hence don't need 
+      //them anymore
+      SegmentContainer container = new SegmentContainer(inName, indexIn);
+      MergeQueue mQueue = new MergeQueue(container.getSegmentList());
+      return mQueue.merge();
+    }
+    
+    /** This class implements the core of the merge logic */
+    private class MergeQueue extends PriorityQueue 
+    implements RawKeyValueIterator {
+      private boolean compress;
+      private boolean blockCompress;
+      private DataOutputBuffer rawKey = new DataOutputBuffer();
+      private ValueBytes rawValue;
+      
+      //a TreeMap used to store the segments sorted by size (segment offset and
+      //segment path name is used to break ties between segments of same sizes)
+      private Map <SegmentDescriptor, Void> sortedSegmentSizes = new TreeMap();
+            
+      public void put(SegmentDescriptor stream) throws IOException {
+        if (size() == 0) {
+          compress = stream.in.isCompressed();
+          blockCompress = stream.in.isBlockCompressed();
+        } else if (compress != stream.in.isCompressed() || 
+            blockCompress != stream.in.isBlockCompressed()) {
+          throw new IOException("All merged files must be compressed or not.");
+        } 
+        super.put(stream);
+      }
+      
+      public MergeQueue(List <SegmentDescriptor> segments) {
+        int size = segments.size();
+        for (int i = 0; i < size; i++) {
+          sortedSegmentSizes.put(segments.get(i), null);
+        }
+      }
+      protected boolean lessThan(Object a, Object b) {
+        SegmentDescriptor msa = (SegmentDescriptor)a;
+        SegmentDescriptor msb = (SegmentDescriptor)b;
+        return comparator.compare(msa.getKey().getData(), 0, 
+            msa.getKey().getLength(), msb.getKey().getData(), 0, 
+            msb.getKey().getLength()) < 0;
+      }
       public void close() throws IOException {
-        queue.close();
+        SegmentDescriptor ms;                           // close inputs
+        while ((ms = (SegmentDescriptor)pop()) != null) {
+          ms.cleanup();
+        }
       }
+      public DataOutputBuffer getKey() throws IOException {
+        return rawKey;
+      }
+      public ValueBytes getValue() throws IOException {
+        return rawValue;
+      }
+      public boolean next() throws IOException {
+        if (size() == 0)
+          return false;
+        SegmentDescriptor ms = (SegmentDescriptor)top();
+        //save the raw key
+        rawKey.reset();
+        rawKey.write(ms.getKey().getData(), 0, ms.getKey().getLength());
+        //load the raw value. Re-use the existing rawValue buffer
+        if(rawValue == null)
+          rawValue = ms.in.createValueBytes();
+        ms.nextRawValue(rawValue);
 
-      public void run() throws IOException {
-        LOG.debug("merging files=" + inFiles.length);
-        for (int i = 0; i < inFiles.length; i++) {
-          Path inFile = inFiles[i];
-          MergeStream ms =
-            new MergeStream(new Reader(fs, inFile, memory/(factor+1), conf));
-          if (ms.next())
-            queue.put(ms);
+        if (ms.nextRawKey()) {
+          adjustTop();
+        } else {
+          pop();
+          ms.cleanup();
         }
-
-        queue.merge();
+        return true;
       }
-    } // SequenceFile.Sorter.MergeFiles
+      
+      /** This is the single level merge that is called multiple times 
+       * depending on the factor size and the number of segments
+       * @return RawKeyValueIterator
+       * @throws IOException
+       */
+      public RawKeyValueIterator merge() throws IOException {
+        //create the MergeStreams from the sorted map created in the constructor
+        //and dump the final output to a file
+        int numSegments = sortedSegmentSizes.size();
+        int origFactor = factor;
+        int passNo = 1;
+        do {
+          //get the factor for this pass of merge
+          factor = getPassFactor(passNo, numSegments);
+          //extract the smallest 'factor' number of segment pointers from the 
+          //TreeMap
+          SegmentDescriptor[] mStream = getSegmentDescriptors(factor);
+          
+          //feed the streams to the priority queue
+          initialize(mStream.length); clear();
+          for (int i = 0; i < mStream.length; i++) {
+            if (mStream[i].nextRawKey()) put(mStream[i]);
+          }
+          //if we have lesser number of segments remaining, then just return the
+          //iterator, else do another single level merge
+          if (numSegments <= factor) {
+            return this;
+          } else {
+            //we want to spread the creation of temp files on multiple disks if 
+            //available
+            Path outputFile = conf.getLocalPath("mapred.local.dir", 
+                                  (outFile.suffix("." + passNo)).toString());
+            Writer writer = cloneFileAttributes(fs, 
+                            mStream[0].segmentPathName, outputFile, null);
+            writer.sync = null; //disable sync for temp files
+            writeFile(this, writer);
+            writer.close();
+            
+            //we finished one single level merge; now clean up the priority 
+            //queue
+            this.close();
+            
+            SegmentDescriptor tempSegment = 
+                 new SegmentDescriptor(0, fs.getLength(outputFile), outputFile);
+            //put the segment back in the TreeMap
+            sortedSegmentSizes.put(tempSegment, null);
+            numSegments = sortedSegmentSizes.size();
+            passNo++;
+          }
+          //we are worried about only the first pass merge factor. So reset the 
+          //factor to what it originally was
+          factor = origFactor;
+        } while(true);
+      }
+  
+      //Hadoop-591
+      public int getPassFactor(int passNo, int numSegments) {
+        if (passNo > 1 || numSegments <= factor || factor == 1) 
+          return factor;
+        int mod = (numSegments - 1) % (factor - 1);
+        if (mod == 0)
+          return factor;
+        return mod + 1;
+      }
+      
+      /** Return (& remove) the requested number of segment descriptors from the
+       * sorted map.
+       */
+      public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
+        if (numDescriptors > sortedSegmentSizes.size())
+          numDescriptors = sortedSegmentSizes.size();
+        SegmentDescriptor[] SegmentDescriptors = 
+                                   new SegmentDescriptor[numDescriptors];
+        Iterator iter = sortedSegmentSizes.keySet().iterator();
+        int i = 0;
+        while (i < numDescriptors) {
+          SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
+          iter.remove();
+        }
+        return SegmentDescriptors;
+      }
+    } // SequenceFile.Sorter.MergeQueue
 
-    private class MergeStream {
-      private Reader in;
+    /** This class defines a merge segment. This class can be subclassed to 
+     * provide a customized cleanup method implementation. In this 
+     * implementation, cleanup closes the file handle and deletes the file 
+     */
+    public class SegmentDescriptor implements Comparable {
+      
+      long segmentOffset; //the start of the segment in the file
+      long segmentLength; //the length of the segment
+      Path segmentPathName; //the path name of the file containing the segment
+      boolean ignoreSync = true; //set to true for temp files
+      private Reader in = null; 
+      private DataOutputBuffer rawKey = null; //this will hold the current key
+      private boolean preserveInput = false; //delete input segment files?
+      
+      /** Constructs a segment
+       * @param segmentOffset the offset of the segment in the file
+       * @param segmentLength the length of the segment
+       * @param segmentPathName the path name of the file containing the segment
+       */
+      public SegmentDescriptor (long segmentOffset, long segmentLength, 
+              Path segmentPathName) {
+        this.segmentOffset = segmentOffset;
+        this.segmentLength = segmentLength;
+        this.segmentPathName = segmentPathName;
+      }
+      
+      /** Do the sync checks */
+      public void doSync() {ignoreSync = false;}
+      
+      /** Whether to delete the files when no longer needed */
+      public void preserveInput(boolean preserve) {
+        preserveInput = preserve;
+      }
 
-      private DataOutputBuffer rawKey = null;
-      private ValueBytes rawValue = null;
+      public boolean shouldPreserveInput() {
+        return preserveInput;
+      }
       
-      public MergeStream(Reader reader) throws IOException {
+      public int compareTo(Object o) {
+        SegmentDescriptor that = (SegmentDescriptor)o;
+        if (this.segmentLength != that.segmentLength) {
+          return (this.segmentLength < that.segmentLength ? -1 : 1);
+        }
+        if (this.segmentOffset != that.segmentOffset) {
+          return (this.segmentOffset < that.segmentOffset ? -1 : 1);
+        }
+        return (this.segmentPathName.toString()).
+                compareTo(that.segmentPathName.toString());
+      }
+
+      /** Fills up the rawKey object with the key returned by the Reader
+       * @return true if there is a key returned; false, otherwise
+       * @throws IOException
+       */
+      public boolean nextRawKey() throws IOException {
+        if (in == null) {
+        Reader reader = new Reader(fs, segmentPathName, 
+                memory/(factor+1), segmentOffset, 
+                segmentLength, conf);
+        
+        //sometimes we ignore syncs especially for temp merge files
+        if (ignoreSync) reader.sync = null;
+
         if (reader.keyClass != keyClass)
           throw new IOException("wrong key class: " + reader.getKeyClass() +
                                 " is not " + keyClass);
@@ -1842,98 +2157,114 @@
                                 " is not " + valClass);
         this.in = reader;
         rawKey = new DataOutputBuffer();
-        rawValue = in.createValueBytes();
-      }
-
-      public boolean next() throws IOException {
+        }
         rawKey.reset();
-        int recordLength = 
-          in.nextRaw(rawKey, rawValue);
-        return (recordLength >= 0);
+        int keyLength = 
+          in.nextRawKey(rawKey);
+        return (keyLength >= 0);
       }
-    } // SequenceFile.Sorter.MergeStream
 
-    private class MergeQueue extends PriorityQueue {
-      private Path outName;
-      private FSDataOutputStream out;
-      private FSDataOutputStream indexOut;
-      private boolean done;
-      private boolean compress;
-      private boolean blockCompress;
-      private CompressionCodec codec = null;
-
-      public void put(MergeStream stream) throws IOException {
-        if (size() == 0) {
-          compress = stream.in.isCompressed();
-          blockCompress = stream.in.isBlockCompressed();
-          codec = stream.in.getCompressionCodec();
-        } else if (compress != stream.in.isCompressed() || 
-            blockCompress != stream.in.isBlockCompressed()) {
-          throw new IOException("All merged files must be compressed or not.");
-        } 
-        super.put(stream);
+      /** Fills up the passed rawValue with the value corresponding to the key
+       * read earlier
+       * @param rawValue
+       * @return the length of the value
+       * @throws IOException
+       */
+      public int nextRawValue(ValueBytes rawValue) throws IOException {
+        int valLength = in.nextRawValue(rawValue);
+        return valLength;
       }
-
-      public MergeQueue(int size, Path outName, boolean done)
-        throws IOException {
-        initialize(size);
-        this.outName = outName;
-        this.out = fs.create(this.outName, true, memory/(factor+1));
-        if (!done) {
-          this.indexOut = fs.create(outName.suffix(".index"), true, 
-              memory/(factor+1));
-        }
-        this.done = done;
+      
+      /** Returns the stored rawKey */
+      public DataOutputBuffer getKey() {
+        return rawKey;
       }
-
-      protected boolean lessThan(Object a, Object b) {
-        MergeStream msa = (MergeStream)a;
-        MergeStream msb = (MergeStream)b;
-        return comparator.compare(msa.rawKey.getData(), 0, msa.rawKey.getLength(),
-            msb.rawKey.getData(), 0, msb.rawKey.getLength()) < 0;
+      
+      /** closes the underlying reader */
+      private void close() throws IOException {
+        this.in.close();
       }
 
-      public void merge() throws IOException {
-        Writer writer = createWriter(out, keyClass, valClass, 
-            compress, blockCompress, codec);
-        if (!done) {
-          writer.sync = null;                     // disable sync on temp files
+      /** The default cleanup. Subclasses can override this with a custom 
+       * cleanup 
+       */
+      public void cleanup() throws IOException {
+        close();
+        if (!preserveInput) {
+          fs.delete(segmentPathName);
         }
+      }
+    } // SequenceFile.Sorter.SegmentDescriptor
+    
+    /** This class provisions multiple segments contained within a single
+     *  file
+     */
+    private class LinkedSegmentsDescriptor extends SegmentDescriptor {
 
-        while (size() != 0) {
-          MergeStream ms = (MergeStream)top();
-          writer.appendRaw(ms.rawKey.getData(), 0, ms.rawKey.getLength(), 
-              ms.rawValue);                       // write top entry
-          
-          if (ms.next()) {                        // has another entry
-            adjustTop();
-          } else {
-            pop();                                // done with this file
-            ms.in.close();
-          }
-        }
+      SegmentContainer parentContainer = null;
 
-        if (writer instanceof SequenceFile.BlockCompressWriter) {
-          SequenceFile.BlockCompressWriter bcWriter = 
-            (SequenceFile.BlockCompressWriter) writer;
-          bcWriter.writeBlock();
-        }
-        out.flush();
+      /** Constructs a segment
+       * @param segmentOffset the offset of the segment in the file
+       * @param segmentLength the length of the segment
+       * @param segmentPathName the path name of the file containing the segment
+       * @param parent the parent SegmentContainer that holds the segment
+       */
+      public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
+              Path segmentPathName, SegmentContainer parent) {
+        super(segmentOffset, segmentLength, segmentPathName);
+        this.parentContainer = parent;
+      }
+      /** The default cleanup. Subclasses can override this with a custom 
+       * cleanup 
+       */
+      public void cleanup() throws IOException {
+        super.close();
+        if (super.shouldPreserveInput()) return;
+        parentContainer.cleanup();
+      }
+    } //SequenceFile.Sorter.LinkedSegmentsDescriptor
+
+    /** The class that defines a container for segments to be merged. Primarily
+     * required to delete temp files as soon as all the contained segments
+     * have been looked at */
+    private class SegmentContainer {
+      private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
+      private int numSegmentsContained; //# of segments contained
+      private Path inName; //input file from where segments are created
+      
+      //the list of segments read from the file
+      private ArrayList <SegmentDescriptor> segments = 
+                                   new ArrayList <SegmentDescriptor>();
+      /** This constructor is there primarily to serve the sort routine that 
+       * generates a single output file with an associated index file */
+      public SegmentContainer(Path inName, Path indexIn) throws IOException {
+        //get the segments from indexIn
+        FSDataInputStream fsIndexIn = fs.open(indexIn);
+        long end = fs.getLength(indexIn);
+        while (fsIndexIn.getPos() < end) {
+          long segmentOffset = WritableUtils.readVLong(fsIndexIn);
+          long segmentLength = WritableUtils.readVLong(fsIndexIn);
+          Path segmentName = inName;
+          segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
+                                 segmentLength, segmentName, this));
+        }
+        fsIndexIn.close();
+        fs.delete(indexIn);
+        numSegmentsContained = segments.size();
+        this.inName = inName;
       }
 
-      public void close() throws IOException {
-        MergeStream ms;                           // close inputs
-        while ((ms = (MergeStream)pop()) != null) {
-          ms.in.close();
-        }
-        out.close();                              // close output
-        if (indexOut != null) {
-          indexOut.close();
+      public List <SegmentDescriptor> getSegmentList() {
+        return segments;
+      }
+      public void cleanup() throws IOException {
+        numSegmentsCleanedUp++;
+        if (numSegmentsCleanedUp == numSegmentsContained) {
+          fs.delete(inName);
         }
       }
-      
-    } // SequenceFile.Sorter.MergeQueue
-    
+    } //SequenceFile.Sorter.SegmentContainer
+
   } // SequenceFile.Sorter
 
 } // SequenceFile