You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/22 21:14:58 UTC
svn commit: r416447 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/io/SequenceFile.java
src/java/org/apache/hadoop/mapred/ReduceTask.java
Author: cutting
Date: Thu Jun 22 12:14:58 2006
New Revision: 416447
URL: http://svn.apache.org/viewvc?rev=416447&view=rev
Log:
HADOOP-314. Remove the append phase when sorting. Contributed by Owen.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=416447&r1=416446&r2=416447&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jun 22 12:14:58 2006
@@ -30,6 +30,14 @@
single read failure will not alone cause failure of a task.
(omalley via cutting)
+ 8. HADOOP-314. Remove the "append" phase when reducing. Map output
+ files are now directly passed to the sorter, without first
+ appending them into a single file. Now, the first third of reduce
+ progress is "copy" (transferring map output to reduce nodes), the
+ middle third is "sort" (sorting map output) and the last third is
+ "reduce" (generating output). Long-term, the "sort" phase will
+ also be removed. (omalley via cutting)
+
Release 0.3.2 - 2006-06-09
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=416447&r1=416446&r2=416447&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Thu Jun 22 12:14:58 2006
@@ -49,7 +49,6 @@
public static class Writer {
private FSDataOutputStream out;
private DataOutputBuffer buffer = new DataOutputBuffer();
- private FileSystem fs = null;
private Path target = null;
private Class keyClass;
@@ -95,7 +94,6 @@
public Writer(FileSystem fs, Path name,
Class keyClass, Class valClass, boolean compress)
throws IOException {
- this.fs = fs;
this.target = name;
init(fs.create(target), keyClass, valClass, compress);
}
@@ -205,7 +203,6 @@
private FSDataInputStream in;
private DataOutputBuffer outBuf = new DataOutputBuffer();
private DataInputBuffer inBuf = new DataInputBuffer();
- private FileSystem fs = null;
private byte[] version = new byte[VERSION.length];
@@ -239,7 +236,6 @@
private Reader(FileSystem fs, Path name, int bufferSize,
Configuration conf) throws IOException {
- this.fs = fs;
this.file = name;
this.in = fs.open(file, bufferSize);
this.end = fs.getLength(file);
@@ -249,7 +245,6 @@
private Reader(FileSystem fs, Path file, int bufferSize, long start,
long length, Configuration conf) throws IOException {
- this.fs = fs;
this.file = file;
this.in = fs.open(file, bufferSize);
this.conf = conf;
@@ -465,8 +460,7 @@
private WritableComparator comparator;
- private Path inFile; // when sorting
- private Path[] inFiles; // when merging
+ private Path[] inFiles; // when merging or sorting
private Path outFile;
@@ -508,16 +502,22 @@
/** Get the total amount of buffer memory, in bytes.*/
public int getMemory() { return memory; }
- /** Perform a file sort.*/
- public void sort(Path inFile, Path outFile) throws IOException {
+ /**
+ * Perform a file sort from a set of input files into an output file.
+ * @param inFiles the files to be sorted
+ * @param outFile the sorted output file
+ * @param deleteInput should the input files be deleted as they are read?
+ */
+ public void sort(Path[] inFiles, Path outFile,
+ boolean deleteInput) throws IOException {
if (fs.exists(outFile)) {
throw new IOException("already exists: " + outFile);
}
- this.inFile = inFile;
+ this.inFiles = inFiles;
this.outFile = outFile;
- int segments = sortPass();
+ int segments = sortPass(deleteInput);
int pass = 1;
while (segments > 1) {
segments = mergePass(pass, segments <= factor);
@@ -525,11 +525,20 @@
}
}
- private int sortPass() throws IOException {
+ /**
+ * The backwards compatible interface to sort.
+ * @param inFile the input file to sort
+ * @param outFile the sorted output file
+ */
+ public void sort(Path inFile, Path outFile) throws IOException {
+ sort(new Path[]{inFile}, outFile, false);
+ }
+
+ private int sortPass(boolean deleteInput) throws IOException {
LOG.debug("running sort pass");
- SortPass sortPass = new SortPass(this.conf); // make the SortPass
+ SortPass sortPass = new SortPass(); // make the SortPass
try {
- return sortPass.run(); // run it
+ return sortPass.run(deleteInput); // run it
} finally {
sortPass.close(); // close it
}
@@ -550,13 +559,15 @@
private FSDataOutputStream out;
private Path outName;
- public SortPass(Configuration conf) throws IOException {
- in = new Reader(fs, inFile, conf);
- }
-
- public int run() throws IOException {
+ public int run(boolean deleteInput) throws IOException {
int segments = 0;
- boolean atEof = false;
+ int currentFile = 0;
+ boolean atEof = currentFile >= inFiles.length;
+ boolean isCompressed = false;
+ if (!atEof) {
+ in = new Reader(fs, inFiles[currentFile], conf);
+ isCompressed = in.isCompressed();
+ }
while (!atEof) {
int count = 0;
buffer.reset();
@@ -564,12 +575,21 @@
int start = buffer.getLength(); // read an entry into buffer
int keyLength = in.next(buffer);
- int length = buffer.getLength() - start;
-
if (keyLength == -1) {
- atEof = true;
- break;
+ in.close();
+ if (deleteInput) {
+ fs.delete(inFiles[currentFile]);
+ }
+ currentFile += 1;
+ atEof = currentFile >= inFiles.length;
+ if (!atEof) {
+ in = new Reader(fs, inFiles[currentFile], conf);
+ } else {
+ in = null;
+ }
+ continue;
}
+ int length = buffer.getLength() - start;
if (count == starts.length)
grow();
@@ -586,15 +606,16 @@
LOG.info("flushing segment " + segments);
rawBuffer = buffer.getData();
sort(count);
- flush(count, segments==0 && atEof);
+ flush(count, isCompressed, segments==0 && atEof);
segments++;
}
return segments;
}
public void close() throws IOException {
- in.close();
-
+ if (in != null) {
+ in.close();
+ }
if (out != null) {
out.close();
}
@@ -615,7 +636,8 @@
return result;
}
- private void flush(int count, boolean done) throws IOException {
+ private void flush(int count, boolean isCompressed,
+ boolean done) throws IOException {
if (out == null) {
outName = done ? outFile : outFile.suffix(".0");
out = fs.create(outName);
@@ -630,7 +652,7 @@
out.writeLong(count); // write count
}
- Writer writer = new Writer(out, keyClass, valClass, in.isCompressed());
+ Writer writer = new Writer(out, keyClass, valClass, isCompressed);
if (!done) {
writer.sync = null; // disable sync on temp files
}
@@ -701,7 +723,6 @@
}
private class MergePass {
- private int pass;
private boolean last;
private MergeQueue queue;
@@ -709,7 +730,6 @@
private Path inName;
public MergePass(int pass, boolean last) throws IOException {
- this.pass = pass;
this.last = last;
this.queue =
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=416447&r1=416446&r2=416447&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Jun 22 12:14:58 2006
@@ -44,7 +44,6 @@
{ getProgress().setStatus("reduce"); }
private Progress copyPhase = getProgress().addPhase("copy");
- private Progress appendPhase = getProgress().addPhase("append");
private Progress sortPhase = getProgress().addPhase("sort");
private Progress reducePhase = getProgress().addPhase("reduce");
private JobConf conf;
@@ -173,7 +172,6 @@
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException {
- Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
Reducer reducer = (Reducer)job.newInstance(job.getReducerClass());
reducer.configure(job);
@@ -182,44 +180,10 @@
copyPhase.complete(); // copy is already complete
// open a file to collect map output
- Path file = job.getLocalPath(getTaskId()+Path.SEPARATOR+"all.1");
- SequenceFile.Writer writer =
- new SequenceFile.Writer(lfs, file, keyClass, valueClass);
- try {
- // append all input files into a single input file
- for (int i = 0; i < numMaps; i++) {
- appendPhase.addPhase(); // one per file
- }
-
- DataOutputBuffer buffer = new DataOutputBuffer();
-
- for (int i = 0; i < numMaps; i++) {
- Path partFile =
- this.mapOutputFile.getInputFile(i, getTaskId());
- float progPerByte = 1.0f / lfs.getLength(partFile);
- Progress phase = appendPhase.phase();
- phase.setStatus(partFile.toString());
-
- SequenceFile.Reader in = new SequenceFile.Reader(lfs, partFile, job);
- try {
- int keyLen;
- while((keyLen = in.next(buffer)) > 0) {
- writer.append(buffer.getData(), 0, buffer.getLength(), keyLen);
- phase.set(in.getPosition()*progPerByte);
- reportProgress(umbilical);
- buffer.reset();
- }
- } finally {
- in.close();
- }
- phase.complete();
- }
-
- } finally {
- writer.close();
+ Path[] mapFiles = new Path[numMaps];
+ for(int i=0; i < numMaps; i++) {
+ mapFiles[i] = mapOutputFile.getInputFile(i, getTaskId());
}
-
- appendPhase.complete(); // append is complete
// spawn a thread to give sort progress heartbeats
Thread sortProgress = new Thread() {
@@ -251,8 +215,7 @@
// sort the input file
SequenceFile.Sorter sorter =
new SequenceFile.Sorter(lfs, comparator, valueClass, job);
- sorter.sort(file, sortedFile); // sort
- lfs.delete(file); // remove unsorted
+ sorter.sort(mapFiles, sortedFile, true); // sort
} finally {
sortComplete = true;