You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/22 21:14:58 UTC

svn commit: r416447 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/io/SequenceFile.java src/java/org/apache/hadoop/mapred/ReduceTask.java

Author: cutting
Date: Thu Jun 22 12:14:58 2006
New Revision: 416447

URL: http://svn.apache.org/viewvc?rev=416447&view=rev
Log:
HADOOP-314.  Remove the append phase when sorting.  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=416447&r1=416446&r2=416447&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jun 22 12:14:58 2006
@@ -30,6 +30,14 @@
     single read failure will not alone cause failure of a task.
     (omalley via cutting)
 
+ 8. HADOOP-314.  Remove the "append" phase when reducing.  Map output
+    files are now directly passed to the sorter, without first
+    appending them into a single file.  Now, the first third of reduce
+    progress is "copy" (transferring map output to reduce nodes), the
+    middle third is "sort" (sorting map output) and the last third is
+    "reduce" (generating output).  Long-term, the "sort" phase will
+    also be removed.  (omalley via cutting)
+
 
 Release 0.3.2 - 2006-06-09
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=416447&r1=416446&r2=416447&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Thu Jun 22 12:14:58 2006
@@ -49,7 +49,6 @@
   public static class Writer {
     private FSDataOutputStream out;
     private DataOutputBuffer buffer = new DataOutputBuffer();
-    private FileSystem fs = null;
     private Path target = null;
 
     private Class keyClass;
@@ -95,7 +94,6 @@
     public Writer(FileSystem fs, Path name,
                   Class keyClass, Class valClass, boolean compress)
       throws IOException {
-      this.fs = fs;
       this.target = name;
       init(fs.create(target), keyClass, valClass, compress);
     }
@@ -205,7 +203,6 @@
     private FSDataInputStream in;
     private DataOutputBuffer outBuf = new DataOutputBuffer();
     private DataInputBuffer inBuf = new DataInputBuffer();
-    private FileSystem fs = null;
 
     private byte[] version = new byte[VERSION.length];
 
@@ -239,7 +236,6 @@
 
     private Reader(FileSystem fs, Path name, int bufferSize,
                    Configuration conf) throws IOException {
-      this.fs = fs;
       this.file = name;
       this.in = fs.open(file, bufferSize);
       this.end = fs.getLength(file);
@@ -249,7 +245,6 @@
     
     private Reader(FileSystem fs, Path file, int bufferSize, long start,
                    long length, Configuration conf) throws IOException {
-      this.fs = fs;
       this.file = file;
       this.in = fs.open(file, bufferSize);
       this.conf = conf;
@@ -465,8 +460,7 @@
 
     private WritableComparator comparator;
 
-    private Path inFile;                        // when sorting
-    private Path[] inFiles;                     // when merging
+    private Path[] inFiles;                     // when merging or sorting
 
     private Path outFile;
 
@@ -508,16 +502,22 @@
     /** Get the total amount of buffer memory, in bytes.*/
     public int getMemory() { return memory; }
 
-    /** Perform a file sort.*/
-    public void sort(Path inFile, Path outFile) throws IOException {
+    /** 
+     * Perform a file sort from a set of input files into an output file.
+     * @param inFiles the files to be sorted
+     * @param outFile the sorted output file
+     * @param deleteInput should the input files be deleted as they are read?
+     */
+    public void sort(Path[] inFiles, Path outFile,
+                     boolean deleteInput) throws IOException {
       if (fs.exists(outFile)) {
         throw new IOException("already exists: " + outFile);
       }
 
-      this.inFile = inFile;
+      this.inFiles = inFiles;
       this.outFile = outFile;
 
-      int segments = sortPass();
+      int segments = sortPass(deleteInput);
       int pass = 1;
       while (segments > 1) {
         segments = mergePass(pass, segments <= factor);
@@ -525,11 +525,20 @@
       }
     }
 
-    private int sortPass() throws IOException {
+    /**
+     * The backwards compatible interface to sort.
+     * @param inFile the input file to sort
+     * @param outFile the sorted output file
+     */
+    public void sort(Path inFile, Path outFile) throws IOException {
+      sort(new Path[]{inFile}, outFile, false);
+    }
+    
+    private int sortPass(boolean deleteInput) throws IOException {
       LOG.debug("running sort pass");
-      SortPass sortPass = new SortPass(this.conf);         // make the SortPass
+      SortPass sortPass = new SortPass();         // make the SortPass
       try {
-        return sortPass.run();                    // run it
+        return sortPass.run(deleteInput);         // run it
       } finally {
         sortPass.close();                         // close it
       }
@@ -550,13 +559,15 @@
       private FSDataOutputStream out;
       private Path outName;
 
-      public SortPass(Configuration conf) throws IOException {
-        in = new Reader(fs, inFile, conf);
-      }
-      
-      public int run() throws IOException {
+      public int run(boolean deleteInput) throws IOException {
         int segments = 0;
-        boolean atEof = false;
+        int currentFile = 0;
+        boolean atEof = currentFile >= inFiles.length;
+        boolean isCompressed = false;
+        if (!atEof) {
+          in = new Reader(fs, inFiles[currentFile], conf);
+          isCompressed = in.isCompressed();
+        }
         while (!atEof) {
           int count = 0;
           buffer.reset();
@@ -564,12 +575,21 @@
 
             int start = buffer.getLength();       // read an entry into buffer
             int keyLength = in.next(buffer);
-            int length = buffer.getLength() - start;
-
             if (keyLength == -1) {
-              atEof = true;
-              break;
+              in.close();
+              if (deleteInput) {
+                fs.delete(inFiles[currentFile]);
+              }
+              currentFile += 1;
+              atEof = currentFile >= inFiles.length;
+              if (!atEof) {
+                in = new Reader(fs, inFiles[currentFile], conf);
+              } else {
+                in = null;
+              }
+              continue;
             }
+            int length = buffer.getLength() - start;
 
             if (count == starts.length)
               grow();
@@ -586,15 +606,16 @@
           LOG.info("flushing segment " + segments);
           rawBuffer = buffer.getData();
           sort(count);
-          flush(count, segments==0 && atEof);
+          flush(count, isCompressed, segments==0 && atEof);
           segments++;
         }
         return segments;
       }
 
       public void close() throws IOException {
-        in.close();
-
+        if (in != null) {
+          in.close();
+        }
         if (out != null) {
           out.close();
         }
@@ -615,7 +636,8 @@
         return result;
       }
 
-      private void flush(int count, boolean done) throws IOException {
+      private void flush(int count, boolean isCompressed, 
+                         boolean done) throws IOException {
         if (out == null) {
           outName = done ? outFile : outFile.suffix(".0");
           out = fs.create(outName);
@@ -630,7 +652,7 @@
           out.writeLong(count);                   // write count
         }
 
-        Writer writer = new Writer(out, keyClass, valClass, in.isCompressed());
+        Writer writer = new Writer(out, keyClass, valClass, isCompressed);
         if (!done) {
           writer.sync = null;                     // disable sync on temp files
         }
@@ -701,7 +723,6 @@
     }
 
     private class MergePass {
-      private int pass;
       private boolean last;
 
       private MergeQueue queue;
@@ -709,7 +730,6 @@
       private Path inName;
 
       public MergePass(int pass, boolean last) throws IOException {
-        this.pass = pass;
         this.last = last;
 
         this.queue =

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=416447&r1=416446&r2=416447&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Jun 22 12:14:58 2006
@@ -44,7 +44,6 @@
   { getProgress().setStatus("reduce"); }
 
   private Progress copyPhase = getProgress().addPhase("copy");
-  private Progress appendPhase = getProgress().addPhase("append");
   private Progress sortPhase  = getProgress().addPhase("sort");
   private Progress reducePhase = getProgress().addPhase("reduce");
   private JobConf conf;
@@ -173,7 +172,6 @@
 
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
-    Class keyClass = job.getMapOutputKeyClass();
     Class valueClass = job.getMapOutputValueClass();
     Reducer reducer = (Reducer)job.newInstance(job.getReducerClass());
     reducer.configure(job);
@@ -182,44 +180,10 @@
     copyPhase.complete();                         // copy is already complete
 
     // open a file to collect map output
-    Path file = job.getLocalPath(getTaskId()+Path.SEPARATOR+"all.1");
-    SequenceFile.Writer writer =
-      new SequenceFile.Writer(lfs, file, keyClass, valueClass);
-    try {
-      // append all input files into a single input file
-      for (int i = 0; i < numMaps; i++) {
-        appendPhase.addPhase();                 // one per file
-      }
-      
-      DataOutputBuffer buffer = new DataOutputBuffer();
-
-      for (int i = 0; i < numMaps; i++) {
-        Path partFile =
-          this.mapOutputFile.getInputFile(i, getTaskId());
-        float progPerByte = 1.0f / lfs.getLength(partFile);
-        Progress phase = appendPhase.phase();
-        phase.setStatus(partFile.toString());
-
-        SequenceFile.Reader in = new SequenceFile.Reader(lfs, partFile, job);
-        try {
-          int keyLen;
-          while((keyLen = in.next(buffer)) > 0) {
-            writer.append(buffer.getData(), 0, buffer.getLength(), keyLen);
-            phase.set(in.getPosition()*progPerByte);
-            reportProgress(umbilical);
-            buffer.reset();
-          }
-        } finally {
-          in.close();
-        }
-        phase.complete();
-      }
-      
-    } finally {
-      writer.close();
+    Path[] mapFiles = new Path[numMaps];
+    for(int i=0; i < numMaps; i++) {
+      mapFiles[i] = mapOutputFile.getInputFile(i, getTaskId());
     }
-      
-    appendPhase.complete();                     // append is complete
 
     // spawn a thread to give sort progress heartbeats
     Thread sortProgress = new Thread() {
@@ -251,8 +215,7 @@
       // sort the input file
       SequenceFile.Sorter sorter =
         new SequenceFile.Sorter(lfs, comparator, valueClass, job);
-      sorter.sort(file, sortedFile);              // sort
-      lfs.delete(file);                           // remove unsorted
+      sorter.sort(mapFiles, sortedFile, true);              // sort
 
     } finally {
       sortComplete = true;