You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/11/26 02:41:51 UTC

svn commit: r720702 - in /hadoop/core/branches/branch-0.18: CHANGES.txt src/mapred/org/apache/hadoop/mapred/MapTask.java src/mapred/org/apache/hadoop/mapred/Merger.java

Author: cdouglas
Date: Tue Nov 25 17:41:51 2008
New Revision: 720702

URL: http://svn.apache.org/viewvc?rev=720702&view=rev
Log:
HADOOP-4614. Lazily open segments when merging map spills to avoid using
too many file descriptors. Contributed by Yuri Pradkin.

Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/Merger.java

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=720702&r1=720701&r2=720702&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Tue Nov 25 17:41:51 2008
@@ -43,6 +43,9 @@
     HADOOP-4659. Root cause of connection failure is being ost to code that
     uses it for delaying startup. (Steve Loughran and Hairong via hairong)
 
+    HADOOP-4614. Lazily open segments when merging map spills to avoid using
+    too many file descriptors. (Yuri Pradkin via cdouglas)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=720702&r1=720701&r2=720702&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Nov 25 17:41:51 2008
@@ -1028,11 +1028,9 @@
             long rawSegmentLength = indexIn.readLong();
             long segmentLength = indexIn.readLong();
             indexIn.close();
-            FSDataInputStream in = localFs.open(filename[i]);
-            in.seek(segmentOffset);
             Segment<K, V> s = 
-              new Segment<K, V>(new Reader<K, V>(job, in, segmentLength, codec),
-                                true);
+              new Segment<K, V>(job, localFs, filename[i], segmentOffset, 
+                                segmentLength, codec, true);
             segmentList.add(i, s);
             
             if (LOG.isDebugEnabled()) {

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=720702&r1=720701&r2=720702&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/Merger.java Tue Nov 25 17:41:51 2008
@@ -26,6 +26,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -99,17 +100,25 @@
     Path file = null;
     boolean preserve = false;
     CompressionCodec codec = null;
+    long segmentOffset = 0;
     long segmentLength = -1;
+
     
     public Segment(Configuration conf, FileSystem fs, Path file,
                    CompressionCodec codec, boolean preserve) throws IOException {
+      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   long segmentOffset, long segmentLength, 
+                   CompressionCodec codec, boolean preserve) throws IOException {
       this.conf = conf;
       this.fs = fs;
       this.file = file;
       this.codec = codec;
       this.preserve = preserve;
-      
-      this.segmentLength = fs.getFileStatus(file).getLen();
+      this.segmentLength = segmentLength;
+      this.segmentOffset = segmentOffset;
     }
     
     public Segment(Reader<K, V> reader, boolean preserve) {
@@ -121,7 +130,9 @@
 
     private void init() throws IOException {
       if (reader == null) {
-        reader = new Reader<K, V>(conf, fs, file, codec);
+        FSDataInputStream in = fs.open(file);
+        in.seek(segmentOffset);
+        reader = new Reader<K, V>(conf, in, segmentLength, codec);
       }
     }