You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/11/26 02:40:22 UTC

svn commit: r720698 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/MapTask.java src/mapred/org/apache/hadoop/mapred/Merger.java

Author: cdouglas
Date: Tue Nov 25 17:40:22 2008
New Revision: 720698

URL: http://svn.apache.org/viewvc?rev=720698&view=rev
Log:
HADOOP-4614. Lazily open segments when merging map spills to avoid using
too many file descriptors. Contributed by Yuri Pradkin.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=720698&r1=720697&r2=720698&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Nov 25 17:40:22 2008
@@ -1250,6 +1250,9 @@
     HADOOP-4659. Root cause of connection failure is being ost to code that
     uses it for delaying startup. (Steve Loughran and Hairong via hairong)
 
+    HADOOP-4614. Lazily open segments when merging map spills to avoid using
+    too many file descriptors. (Yuri Pradkin via cdouglas)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=720698&r1=720697&r2=720698&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Nov 25 17:40:22 2008
@@ -1236,27 +1236,23 @@
             new ArrayList<Segment<K, V>>(numSpills);
           TaskAttemptID mapId = getTaskID();
           for(int i = 0; i < numSpills; i++) {
-            IndexRecord indexRecord = 
+            final IndexRecord indexRecord =
               getIndexInformation(mapId, i, parts);
 
             long segmentOffset = indexRecord.startOffset;
-            long rawSegmentLength = indexRecord.rawLength;
             long segmentLength = indexRecord.partLength;
 
-            FSDataInputStream in = rfs.open(filename[i]);
-            in.seek(segmentOffset);
-
-            Segment<K, V> s = 
-              new Segment<K, V>(new Reader<K, V>(job, in, segmentLength,
-                                                 codec, null), true);
+            Segment<K, V> s =
+              new Segment<K, V>(job, rfs, filename[i], segmentOffset,
+                                segmentLength, codec, true);
             segmentList.add(i, s);
             
             if (LOG.isDebugEnabled()) {
+              long rawSegmentLength = indexRecord.rawLength;
               LOG.debug("MapId=" + mapId + " Reducer=" + parts +
                   "Spill =" + i + "(" + segmentOffset + ","+ 
                         rawSegmentLength + ", " + segmentLength + ")");
             }
-            indexRecord = null;
           }
           
           //merge

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=720698&r1=720697&r2=720698&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Tue Nov 25 17:40:22 2008
@@ -26,6 +26,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -135,17 +136,25 @@
     Path file = null;
     boolean preserve = false;
     CompressionCodec codec = null;
+    long segmentOffset = 0;
     long segmentLength = -1;
     
     public Segment(Configuration conf, FileSystem fs, Path file,
                    CompressionCodec codec, boolean preserve) throws IOException {
+      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+        long segmentOffset, long segmentLength, CompressionCodec codec,
+        boolean preserve) throws IOException {
       this.conf = conf;
       this.fs = fs;
       this.file = file;
       this.codec = codec;
       this.preserve = preserve;
-      
-      this.segmentLength = fs.getFileStatus(file).getLen();
+
+      this.segmentOffset = segmentOffset;
+      this.segmentLength = segmentLength;
     }
     
     public Segment(Reader<K, V> reader, boolean preserve) {
@@ -157,7 +166,9 @@
 
     private void init(Counters.Counter readsCounter) throws IOException {
       if (reader == null) {
-        reader = new Reader<K, V>(conf, fs, file, codec, readsCounter);
+        FSDataInputStream in = fs.open(file);
+        in.seek(segmentOffset);
+        reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter);
       }
     }