You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/11/26 02:40:22 UTC
svn commit: r720698 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/MapTask.java
src/mapred/org/apache/hadoop/mapred/Merger.java
Author: cdouglas
Date: Tue Nov 25 17:40:22 2008
New Revision: 720698
URL: http://svn.apache.org/viewvc?rev=720698&view=rev
Log:
HADOOP-4614. Lazily open segments when merging map spills to avoid using
too many file descriptors. Contributed by Yuri Pradkin.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=720698&r1=720697&r2=720698&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Nov 25 17:40:22 2008
@@ -1250,6 +1250,9 @@
HADOOP-4659. Root cause of connection failure is being ost to code that
uses it for delaying startup. (Steve Loughran and Hairong via hairong)
+ HADOOP-4614. Lazily open segments when merging map spills to avoid using
+ too many file descriptors. (Yuri Pradkin via cdouglas)
+
Release 0.18.2 - 2008-11-03
BUG FIXES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=720698&r1=720697&r2=720698&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Nov 25 17:40:22 2008
@@ -1236,27 +1236,23 @@
new ArrayList<Segment<K, V>>(numSpills);
TaskAttemptID mapId = getTaskID();
for(int i = 0; i < numSpills; i++) {
- IndexRecord indexRecord =
+ final IndexRecord indexRecord =
getIndexInformation(mapId, i, parts);
long segmentOffset = indexRecord.startOffset;
- long rawSegmentLength = indexRecord.rawLength;
long segmentLength = indexRecord.partLength;
- FSDataInputStream in = rfs.open(filename[i]);
- in.seek(segmentOffset);
-
- Segment<K, V> s =
- new Segment<K, V>(new Reader<K, V>(job, in, segmentLength,
- codec, null), true);
+ Segment<K, V> s =
+ new Segment<K, V>(job, rfs, filename[i], segmentOffset,
+ segmentLength, codec, true);
segmentList.add(i, s);
if (LOG.isDebugEnabled()) {
+ long rawSegmentLength = indexRecord.rawLength;
LOG.debug("MapId=" + mapId + " Reducer=" + parts +
"Spill =" + i + "(" + segmentOffset + ","+
rawSegmentLength + ", " + segmentLength + ")");
}
- indexRecord = null;
}
//merge
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=720698&r1=720697&r2=720698&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Tue Nov 25 17:40:22 2008
@@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -135,17 +136,25 @@
Path file = null;
boolean preserve = false;
CompressionCodec codec = null;
+ long segmentOffset = 0;
long segmentLength = -1;
public Segment(Configuration conf, FileSystem fs, Path file,
CompressionCodec codec, boolean preserve) throws IOException {
+ this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve);
+ }
+
+ public Segment(Configuration conf, FileSystem fs, Path file,
+ long segmentOffset, long segmentLength, CompressionCodec codec,
+ boolean preserve) throws IOException {
this.conf = conf;
this.fs = fs;
this.file = file;
this.codec = codec;
this.preserve = preserve;
-
- this.segmentLength = fs.getFileStatus(file).getLen();
+
+ this.segmentOffset = segmentOffset;
+ this.segmentLength = segmentLength;
}
public Segment(Reader<K, V> reader, boolean preserve) {
@@ -157,7 +166,9 @@
private void init(Counters.Counter readsCounter) throws IOException {
if (reader == null) {
- reader = new Reader<K, V>(conf, fs, file, codec, readsCounter);
+ FSDataInputStream in = fs.open(file);
+ in.seek(segmentOffset);
+ reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter);
}
}