You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/11/10 22:11:56 UTC
svn commit: r473481 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/io/SequenceFile.java
Author: cutting
Date: Fri Nov 10 13:11:55 2006
New Revision: 473481
URL: http://svn.apache.org/viewvc?view=rev&rev=473481
Log:
HADOOP-611. Add support for iterator-based merging to SequenceFile. Contributed by Devaraj.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=473481&r1=473480&r2=473481
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Nov 10 13:11:55 2006
@@ -41,6 +41,9 @@
12. HADOOP-696. Fix TestTextInputFormat unit test to not rely on the
order of directory listings. (Sameer Paranjpye via cutting)
+13. HADOOP-611. Add support for iterator-based merging to
+ SequenceFile. (Devaraj Das via cutting)
+
Release 0.8.0 - 2006-11-03
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=473481&r1=473480&r2=473481
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Fri Nov 10 13:11:55 2006
@@ -853,6 +853,7 @@
private long end;
private int keyLength;
+ private int recordLength;
private boolean decompress;
private boolean blockCompressed;
@@ -989,9 +990,6 @@
valLenInFilter = this.codec.createInputStream(valLenBuffer);
valLenIn = new DataInputStream(valLenInFilter);
}
-
-
- lazyDecompress = conf.getBoolean("io.seqfile.lazydecompress", true);
}
/** Close the file. */
@@ -1323,6 +1321,82 @@
}
+ /**
+ * Read 'raw' keys.
+ * @param key - The buffer into which the key is read
+ * @return Returns the key length
+ * @throws IOException
+ */
+ public int nextRawKey(DataOutputBuffer key)
+ throws IOException {
+ if (!blockCompressed) {
+ if (in.getPos() >= end)
+ return -1;
+
+ recordLength = checkAndReadSync(in.readInt());
+ keyLength = in.readInt();
+ key.write(in, keyLength);
+ return keyLength;
+ } else {
+ //Reset syncSeen
+ syncSeen = false;
+
+ // Read 'key'
+ if (noBufferedKeys == 0) {
+ if (in.getPos() >= end)
+ return -1;
+
+ try {
+ readBlock();
+ } catch (EOFException eof) {
+ return -1;
+ }
+ }
+ int keyLength = WritableUtils.readVInt(keyLenIn);
+ if (keyLength < 0) {
+ throw new IOException("zero length key found!");
+ }
+ key.write(keyIn, keyLength);
+ --noBufferedKeys;
+
+ return keyLength;
+ }
+
+ }
+
+ /**
+ * Read 'raw' values.
+ * @param val - The 'raw' value
+ * @return Returns the value length
+ * @throws IOException
+ */
+ public int nextRawValue(ValueBytes val)
+ throws IOException {
+
+ // Position stream to current value
+ seekToCurrentValue();
+
+ if (!blockCompressed) {
+ int valLength = recordLength - keyLength;
+ if (decompress) {
+ CompressedBytes value = (CompressedBytes)val;
+ value.reset(in, valLength);
+ } else {
+ UncompressedBytes value = (UncompressedBytes)val;
+ value.reset(in, valLength);
+ }
+
+ return valLength;
+ } else {
+ int valLength = WritableUtils.readVInt(valLenIn);
+ UncompressedBytes rawValue = (UncompressedBytes)val;
+ rawValue.reset(valIn, valLength);
+ --noBufferedValues;
+ return valLength;
+ }
+
+ }
+
private void handleChecksumException(ChecksumException e)
throws IOException {
if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
@@ -1459,16 +1533,8 @@
this.outFile = outFile;
int segments = sortPass(deleteInput);
- int pass = 1;
- while (segments > 1) {
- segments = mergePass(pass, segments <= factor);
- pass++;
- }
-
- // Clean up intermediate files
- for (int i=0; i < pass; ++i) {
- fs.delete(new Path(outFile.toString() + "." + i));
- fs.delete(new Path(outFile.toString() + "." + i + ".index"));
+ if (segments > 1) {
+ segments = mergePass();
}
}
@@ -1712,128 +1778,377 @@
}
} // SequenceFile.Sorter.SortPass
- private int mergePass(int pass, boolean last) throws IOException {
- LOG.debug("running merge pass=" + pass);
- MergePass mergePass = new MergePass(pass, last);
- try { // make a merge pass
- return mergePass.run(); // run it
- } finally {
- mergePass.close(); // close it
- }
+ /** The interface to iterate over raw keys/values of SequenceFiles. */
+ public static interface RawKeyValueIterator {
+ /** Gets the current raw key
+ * @return DataOutputBuffer
+ * @throws IOException
+ */
+ DataOutputBuffer getKey() throws IOException;
+ /** Gets the current raw value
+ * @return ValueBytes
+ * @throws IOException
+ */
+ ValueBytes getValue() throws IOException;
+ /** Sets up the current key and value (for getKey and getValue)
+ * @return true if there exists a key/value, false otherwise
+ * @throws IOException
+ */
+ boolean next() throws IOException;
+ /** closes the iterator so that the underlying streams can be closed
+ * @throws IOException
+ */
+ void close() throws IOException;
+ }
+
+ /**
+ * Merges the list of segments of type <code>SegmentDescriptor</code>
+ * @param segments the list of SegmentDescriptors
+ * @return RawKeyValueIterator
+ * @throws IOException
+ */
+ public RawKeyValueIterator merge(List <SegmentDescriptor> segments)
+ throws IOException {
+ MergeQueue mQueue = new MergeQueue(segments);
+ return mQueue.merge();
}
- private class MergePass {
- private boolean last;
-
- private MergeQueue queue;
- private FSDataInputStream in = null;
- private Path inName;
- private FSDataInputStream indexIn = null;
-
- public MergePass(int pass, boolean last) throws IOException {
- this.last = last;
-
- this.queue =
- new MergeQueue(factor, last?outFile:outFile.suffix("."+pass), last);
-
- this.inName = outFile.suffix("."+(pass-1));
- this.in = fs.open(inName);
- this.indexIn = fs.open(inName.suffix(".index"));
- }
+ /**
+ * Merges the contents of files passed in Path[]
+ * @param inNames the array of path names
+ * @param deleteInputs true if the input files should be deleted when
+ * unnecessary
+ * @return RawKeyValueIterator
+ * @throws IOException
+ */
+ public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs)
+ throws IOException {
+ //get the segments from inNames
+ ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
+ for (int i = 0; i < inNames.length; i++) {
+ SegmentDescriptor s = new SegmentDescriptor(0,
+ fs.getLength(inNames[i]), inNames[i]);
+ s.preserveInput(!deleteInputs);
+ s.doSync();
+ a.add(s);
+ }
+ factor = inNames.length;
+ MergeQueue mQueue = new MergeQueue(a);
+ return mQueue.merge();
+ }
- public void close() throws IOException {
- in.close(); // close and delete input
- fs.delete(inName);
+ /**
+ * Clones the attributes (like compression of the input file and creates a
+ * corresponding Writer
+ * @param FileSystem
+ * @param inputFile the path of the input file whose attributes should be
+ * cloned
+ * @param outputFile the path of the output file
+ * @param prog the Progressable to report status during the file write
+ * @return Writer
+ * @throws IOException
+ */
+ public Writer cloneFileAttributes(FileSystem fileSys, Path inputFile,
+ Path outputFile, Progressable prog) throws IOException {
+ Reader reader = new Reader(fileSys, inputFile, memory/(factor+1), conf);
+ boolean compress = reader.isCompressed();
+ boolean blockCompress = reader.isBlockCompressed();
+ CompressionCodec codec = reader.getCompressionCodec();
+ reader.close();
+ FSDataOutputStream out;
+ if (prog != null)
+ out = fs.create(outputFile, true, memory/(factor+1), prog);
+ else
+ out = fs.create(outputFile, true, memory/(factor+1));
+ Writer writer = createWriter(out, keyClass, valClass, compress,
+ blockCompress, codec);
+ return writer;
+ }
- queue.close(); // close queue
+ /**
+ * Writes records from RawKeyValueIterator into a file represented by the
+ * passed writer
+ * @param records the RawKeyValueIterator
+ * @param writer the Writer created earlier
+ * @throws IOException
+ */
+ public void writeFile(RawKeyValueIterator records, Writer writer)
+ throws IOException {
+ while(records.next()) {
+ writer.appendRaw(records.getKey().getData(), 0,
+ records.getKey().getLength(), records.getValue());
}
-
- public int run() throws IOException {
- int segments = 0;
- long end = fs.getLength(inName);
-
- while (in.getPos() < end) {
- LOG.debug("merging segment " + segments);
- long segmentStart = queue.out.getPos();
- while (in.getPos() < end && queue.size() < factor) {
- long segmentOffset = WritableUtils.readVLong(indexIn);
- long segmentLength = WritableUtils.readVLong(indexIn);
- Reader reader = new Reader(fs, inName, memory/(factor+1),
- segmentOffset, segmentLength, conf);
- reader.sync = null; // disable sync on temp files
-
- MergeStream ms = new MergeStream(reader); // add segment to queue
- if (ms.next()) {
- queue.put(ms);
- }
- in.seek(reader.end);
- }
-
- queue.merge(); // do a merge
-
- if (!last) {
- WritableUtils.writeVLong(queue.indexOut, segmentStart);
- WritableUtils.writeVLong(queue.indexOut,
- (queue.out.getPos() - segmentStart));
- }
-
- segments++;
- }
-
- return segments;
+ if (writer instanceof SequenceFile.BlockCompressWriter) {
+ SequenceFile.BlockCompressWriter bcWriter =
+ (SequenceFile.BlockCompressWriter) writer;
+ bcWriter.writeBlock();
}
- } // SequenceFile.Sorter.MergePass
-
- /** Merge the provided files.*/
+ }
+
+ /** Merge the provided files.
+ * @param inFiles the array of input path names
+ * @param outFile the final output file
+ * @throws IOException
+ */
public void merge(Path[] inFiles, Path outFile) throws IOException {
- this.inFiles = inFiles;
- this.outFile = outFile;
- this.factor = inFiles.length;
-
if (fs.exists(outFile)) {
throw new IOException("already exists: " + outFile);
}
+ RawKeyValueIterator r = merge(inFiles, false);
+ Writer writer = cloneFileAttributes(fs,
+ inFiles[0], outFile, null);
+
+ writeFile(r, writer);
- MergeFiles mergeFiles = new MergeFiles();
- try { // make a merge pass
- mergeFiles.run(); // run it
- } finally {
- mergeFiles.close(); // close it
- }
+ writer.close();
}
- private class MergeFiles {
- private MergeQueue queue;
+ /** sort calls this to generate the final merged output */
+ private int mergePass() throws IOException {
+ LOG.debug("running merge pass");
+ Writer writer = cloneFileAttributes(fs,
+ outFile.suffix(".0"), outFile, null);
+ RawKeyValueIterator r = merge(outFile.suffix(".0"),
+ outFile.suffix(".0.index"));
+ writeFile(r, writer);
- public MergeFiles() throws IOException {
- this.queue = new MergeQueue(factor, outFile, true);
- }
+ writer.close();
+ return 0;
+ }
+ /** Used by mergePass to merge the output of the sort
+ * @param inName the name of the input file containing sorted segments
+ * @param indexIn the offsets of the sorted segments
+ * @return RawKeyValueIterator
+ * @throws IOException
+ */
+ private RawKeyValueIterator merge(Path inName, Path indexIn)
+ throws IOException {
+ //get the segments from indexIn
+ //we create a SegmentContainer so that we can track segments belonging to
+ //inName and delete inName as soon as we see that we have looked at all
+ //the contained segments during the merge process & hence don't need
+ //them anymore
+ SegmentContainer container = new SegmentContainer(inName, indexIn);
+ MergeQueue mQueue = new MergeQueue(container.getSegmentList());
+ return mQueue.merge();
+ }
+
+ /** This class implements the core of the merge logic */
+ private class MergeQueue extends PriorityQueue
+ implements RawKeyValueIterator {
+ private boolean compress;
+ private boolean blockCompress;
+ private DataOutputBuffer rawKey = new DataOutputBuffer();
+ private ValueBytes rawValue;
+
+ //a TreeMap used to store the segments sorted by size (segment offset and
+ //segment path name is used to break ties between segments of same sizes)
+ private Map <SegmentDescriptor, Void> sortedSegmentSizes = new TreeMap();
+
+ public void put(SegmentDescriptor stream) throws IOException {
+ if (size() == 0) {
+ compress = stream.in.isCompressed();
+ blockCompress = stream.in.isBlockCompressed();
+ } else if (compress != stream.in.isCompressed() ||
+ blockCompress != stream.in.isBlockCompressed()) {
+ throw new IOException("All merged files must be compressed or not.");
+ }
+ super.put(stream);
+ }
+
+ public MergeQueue(List <SegmentDescriptor> segments) {
+ int size = segments.size();
+ for (int i = 0; i < size; i++) {
+ sortedSegmentSizes.put(segments.get(i), null);
+ }
+ }
+ protected boolean lessThan(Object a, Object b) {
+ SegmentDescriptor msa = (SegmentDescriptor)a;
+ SegmentDescriptor msb = (SegmentDescriptor)b;
+ return comparator.compare(msa.getKey().getData(), 0,
+ msa.getKey().getLength(), msb.getKey().getData(), 0,
+ msb.getKey().getLength()) < 0;
+ }
public void close() throws IOException {
- queue.close();
+ SegmentDescriptor ms; // close inputs
+ while ((ms = (SegmentDescriptor)pop()) != null) {
+ ms.cleanup();
+ }
}
+ public DataOutputBuffer getKey() throws IOException {
+ return rawKey;
+ }
+ public ValueBytes getValue() throws IOException {
+ return rawValue;
+ }
+ public boolean next() throws IOException {
+ if (size() == 0)
+ return false;
+ SegmentDescriptor ms = (SegmentDescriptor)top();
+ //save the raw key
+ rawKey.reset();
+ rawKey.write(ms.getKey().getData(), 0, ms.getKey().getLength());
+ //load the raw value. Re-use the existing rawValue buffer
+ if(rawValue == null)
+ rawValue = ms.in.createValueBytes();
+ ms.nextRawValue(rawValue);
- public void run() throws IOException {
- LOG.debug("merging files=" + inFiles.length);
- for (int i = 0; i < inFiles.length; i++) {
- Path inFile = inFiles[i];
- MergeStream ms =
- new MergeStream(new Reader(fs, inFile, memory/(factor+1), conf));
- if (ms.next())
- queue.put(ms);
+ if (ms.nextRawKey()) {
+ adjustTop();
+ } else {
+ pop();
+ ms.cleanup();
}
-
- queue.merge();
+ return true;
}
- } // SequenceFile.Sorter.MergeFiles
+
+ /** This is the single level merge that is called multiple times
+ * depending on the factor size and the number of segments
+ * @return RawKeyValueIterator
+ * @throws IOException
+ */
+ public RawKeyValueIterator merge() throws IOException {
+ //create the MergeStreams from the sorted map created in the constructor
+ //and dump the final output to a file
+ int numSegments = sortedSegmentSizes.size();
+ int origFactor = factor;
+ int passNo = 1;
+ do {
+ //get the factor for this pass of merge
+ factor = getPassFactor(passNo, numSegments);
+ //extract the smallest 'factor' number of segment pointers from the
+ //TreeMap
+ SegmentDescriptor[] mStream = getSegmentDescriptors(factor);
+
+ //feed the streams to the priority queue
+ initialize(mStream.length); clear();
+ for (int i = 0; i < mStream.length; i++) {
+ if (mStream[i].nextRawKey()) put(mStream[i]);
+ }
+ //if we have lesser number of segments remaining, then just return the
+ //iterator, else do another single level merge
+ if (numSegments <= factor) {
+ return this;
+ } else {
+ //we want to spread the creation of temp files on multiple disks if
+ //available
+ Path outputFile = conf.getLocalPath("mapred.local.dir",
+ (outFile.suffix("." + passNo)).toString());
+ Writer writer = cloneFileAttributes(fs,
+ mStream[0].segmentPathName, outputFile, null);
+ writer.sync = null; //disable sync for temp files
+ writeFile(this, writer);
+ writer.close();
+
+ //we finished one single level merge; now clean up the priority
+ //queue
+ this.close();
+
+ SegmentDescriptor tempSegment =
+ new SegmentDescriptor(0, fs.getLength(outputFile), outputFile);
+ //put the segment back in the TreeMap
+ sortedSegmentSizes.put(tempSegment, null);
+ numSegments = sortedSegmentSizes.size();
+ passNo++;
+ }
+ //we are worried about only the first pass merge factor. So reset the
+ //factor to what it originally was
+ factor = origFactor;
+ } while(true);
+ }
+
+ //Hadoop-591
+ public int getPassFactor(int passNo, int numSegments) {
+ if (passNo > 1 || numSegments <= factor || factor == 1)
+ return factor;
+ int mod = (numSegments - 1) % (factor - 1);
+ if (mod == 0)
+ return factor;
+ return mod + 1;
+ }
+
+ /** Return (& remove) the requested number of segment descriptors from the
+ * sorted map.
+ */
+ public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
+ if (numDescriptors > sortedSegmentSizes.size())
+ numDescriptors = sortedSegmentSizes.size();
+ SegmentDescriptor[] SegmentDescriptors =
+ new SegmentDescriptor[numDescriptors];
+ Iterator iter = sortedSegmentSizes.keySet().iterator();
+ int i = 0;
+ while (i < numDescriptors) {
+ SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
+ iter.remove();
+ }
+ return SegmentDescriptors;
+ }
+ } // SequenceFile.Sorter.MergeQueue
- private class MergeStream {
- private Reader in;
+ /** This class defines a merge segment. This class can be subclassed to
+ * provide a customized cleanup method implementation. In this
+ * implementation, cleanup closes the file handle and deletes the file
+ */
+ public class SegmentDescriptor implements Comparable {
+
+ long segmentOffset; //the start of the segment in the file
+ long segmentLength; //the length of the segment
+ Path segmentPathName; //the path name of the file containing the segment
+ boolean ignoreSync = true; //set to true for temp files
+ private Reader in = null;
+ private DataOutputBuffer rawKey = null; //this will hold the current key
+ private boolean preserveInput = false; //delete input segment files?
+
+ /** Constructs a segment
+ * @param segmentOffset the offset of the segment in the file
+ * @param segmentLength the length of the segment
+ * @param segmentPathName the path name of the file containing the segment
+ */
+ public SegmentDescriptor (long segmentOffset, long segmentLength,
+ Path segmentPathName) {
+ this.segmentOffset = segmentOffset;
+ this.segmentLength = segmentLength;
+ this.segmentPathName = segmentPathName;
+ }
+
+ /** Do the sync checks */
+ public void doSync() {ignoreSync = false;}
+
+ /** Whether to delete the files when no longer needed */
+ public void preserveInput(boolean preserve) {
+ preserveInput = preserve;
+ }
- private DataOutputBuffer rawKey = null;
- private ValueBytes rawValue = null;
+ public boolean shouldPreserveInput() {
+ return preserveInput;
+ }
- public MergeStream(Reader reader) throws IOException {
+ public int compareTo(Object o) {
+ SegmentDescriptor that = (SegmentDescriptor)o;
+ if (this.segmentLength != that.segmentLength) {
+ return (this.segmentLength < that.segmentLength ? -1 : 1);
+ }
+ if (this.segmentOffset != that.segmentOffset) {
+ return (this.segmentOffset < that.segmentOffset ? -1 : 1);
+ }
+ return (this.segmentPathName.toString()).
+ compareTo(that.segmentPathName.toString());
+ }
+
+ /** Fills up the rawKey object with the key returned by the Reader
+ * @return true if there is a key returned; false, otherwise
+ * @throws IOException
+ */
+ public boolean nextRawKey() throws IOException {
+ if (in == null) {
+ Reader reader = new Reader(fs, segmentPathName,
+ memory/(factor+1), segmentOffset,
+ segmentLength, conf);
+
+ //sometimes we ignore syncs especially for temp merge files
+ if (ignoreSync) reader.sync = null;
+
if (reader.keyClass != keyClass)
throw new IOException("wrong key class: " + reader.getKeyClass() +
" is not " + keyClass);
@@ -1842,98 +2157,114 @@
" is not " + valClass);
this.in = reader;
rawKey = new DataOutputBuffer();
- rawValue = in.createValueBytes();
- }
-
- public boolean next() throws IOException {
+ }
rawKey.reset();
- int recordLength =
- in.nextRaw(rawKey, rawValue);
- return (recordLength >= 0);
+ int keyLength =
+ in.nextRawKey(rawKey);
+ return (keyLength >= 0);
}
- } // SequenceFile.Sorter.MergeStream
- private class MergeQueue extends PriorityQueue {
- private Path outName;
- private FSDataOutputStream out;
- private FSDataOutputStream indexOut;
- private boolean done;
- private boolean compress;
- private boolean blockCompress;
- private CompressionCodec codec = null;
-
- public void put(MergeStream stream) throws IOException {
- if (size() == 0) {
- compress = stream.in.isCompressed();
- blockCompress = stream.in.isBlockCompressed();
- codec = stream.in.getCompressionCodec();
- } else if (compress != stream.in.isCompressed() ||
- blockCompress != stream.in.isBlockCompressed()) {
- throw new IOException("All merged files must be compressed or not.");
- }
- super.put(stream);
+ /** Fills up the passed rawValue with the value corresponding to the key
+ * read earlier
+ * @param rawValue
+ * @return the length of the value
+ * @throws IOException
+ */
+ public int nextRawValue(ValueBytes rawValue) throws IOException {
+ int valLength = in.nextRawValue(rawValue);
+ return valLength;
}
-
- public MergeQueue(int size, Path outName, boolean done)
- throws IOException {
- initialize(size);
- this.outName = outName;
- this.out = fs.create(this.outName, true, memory/(factor+1));
- if (!done) {
- this.indexOut = fs.create(outName.suffix(".index"), true,
- memory/(factor+1));
- }
- this.done = done;
+
+ /** Returns the stored rawKey */
+ public DataOutputBuffer getKey() {
+ return rawKey;
}
-
- protected boolean lessThan(Object a, Object b) {
- MergeStream msa = (MergeStream)a;
- MergeStream msb = (MergeStream)b;
- return comparator.compare(msa.rawKey.getData(), 0, msa.rawKey.getLength(),
- msb.rawKey.getData(), 0, msb.rawKey.getLength()) < 0;
+
+ /** closes the underlying reader */
+ private void close() throws IOException {
+ this.in.close();
}
- public void merge() throws IOException {
- Writer writer = createWriter(out, keyClass, valClass,
- compress, blockCompress, codec);
- if (!done) {
- writer.sync = null; // disable sync on temp files
+ /** The default cleanup. Subclasses can override this with a custom
+ * cleanup
+ */
+ public void cleanup() throws IOException {
+ close();
+ if (!preserveInput) {
+ fs.delete(segmentPathName);
}
+ }
+ } // SequenceFile.Sorter.SegmentDescriptor
+
+ /** This class provisions multiple segments contained within a single
+ * file
+ */
+ private class LinkedSegmentsDescriptor extends SegmentDescriptor {
- while (size() != 0) {
- MergeStream ms = (MergeStream)top();
- writer.appendRaw(ms.rawKey.getData(), 0, ms.rawKey.getLength(),
- ms.rawValue); // write top entry
-
- if (ms.next()) { // has another entry
- adjustTop();
- } else {
- pop(); // done with this file
- ms.in.close();
- }
- }
+ SegmentContainer parentContainer = null;
- if (writer instanceof SequenceFile.BlockCompressWriter) {
- SequenceFile.BlockCompressWriter bcWriter =
- (SequenceFile.BlockCompressWriter) writer;
- bcWriter.writeBlock();
- }
- out.flush();
+ /** Constructs a segment
+ * @param segmentOffset the offset of the segment in the file
+ * @param segmentLength the length of the segment
+ * @param segmentPathName the path name of the file containing the segment
+ * @param parent the parent SegmentContainer that holds the segment
+ */
+ public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength,
+ Path segmentPathName, SegmentContainer parent) {
+ super(segmentOffset, segmentLength, segmentPathName);
+ this.parentContainer = parent;
+ }
+ /** The default cleanup. Subclasses can override this with a custom
+ * cleanup
+ */
+ public void cleanup() throws IOException {
+ super.close();
+ if (super.shouldPreserveInput()) return;
+ parentContainer.cleanup();
+ }
+ } //SequenceFile.Sorter.LinkedSegmentsDescriptor
+
+ /** The class that defines a container for segments to be merged. Primarily
+ * required to delete temp files as soon as all the contained segments
+ * have been looked at */
+ private class SegmentContainer {
+ private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
+ private int numSegmentsContained; //# of segments contained
+ private Path inName; //input file from where segments are created
+
+ //the list of segments read from the file
+ private ArrayList <SegmentDescriptor> segments =
+ new ArrayList <SegmentDescriptor>();
+ /** This constructor is there primarily to serve the sort routine that
+ * generates a single output file with an associated index file */
+ public SegmentContainer(Path inName, Path indexIn) throws IOException {
+ //get the segments from indexIn
+ FSDataInputStream fsIndexIn = fs.open(indexIn);
+ long end = fs.getLength(indexIn);
+ while (fsIndexIn.getPos() < end) {
+ long segmentOffset = WritableUtils.readVLong(fsIndexIn);
+ long segmentLength = WritableUtils.readVLong(fsIndexIn);
+ Path segmentName = inName;
+ segments.add(new LinkedSegmentsDescriptor(segmentOffset,
+ segmentLength, segmentName, this));
+ }
+ fsIndexIn.close();
+ fs.delete(indexIn);
+ numSegmentsContained = segments.size();
+ this.inName = inName;
}
- public void close() throws IOException {
- MergeStream ms; // close inputs
- while ((ms = (MergeStream)pop()) != null) {
- ms.in.close();
- }
- out.close(); // close output
- if (indexOut != null) {
- indexOut.close();
+ public List <SegmentDescriptor> getSegmentList() {
+ return segments;
+ }
+ public void cleanup() throws IOException {
+ numSegmentsCleanedUp++;
+ if (numSegmentsCleanedUp == numSegmentsContained) {
+ fs.delete(inName);
}
}
-
- } // SequenceFile.Sorter.MergeQueue
-
+ } //SequenceFile.Sorter.SegmentContainer
+
} // SequenceFile.Sorter
} // SequenceFile