You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/11/26 02:41:14 UTC
svn commit: r720701 - in /hadoop/core/branches/branch-0.19: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/MapTask.java
src/mapred/org/apache/hadoop/mapred/Merger.java
Author: cdouglas
Date: Tue Nov 25 17:41:14 2008
New Revision: 720701
URL: http://svn.apache.org/viewvc?rev=720701&view=rev
Log:
HADOOP-4614. Lazily open segments when merging map spills to avoid using
too many file descriptors. Contributed by Yuri Pradkin.
Modified:
hadoop/core/branches/branch-0.19/CHANGES.txt
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Merger.java
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=720701&r1=720700&r2=720701&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Tue Nov 25 17:41:14 2008
@@ -1026,6 +1026,9 @@
HADOOP-4659. Root cause of connection failure is being ost to code that
uses it for delaying startup. (Steve Loughran and Hairong via hairong)
+ HADOOP-4614. Lazily open segments when merging map spills to avoid using
+ too many file descriptors. (Yuri Pradkin via cdouglas)
+
Release 0.18.2 - 2008-11-03
BUG FIXES
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=720701&r1=720700&r2=720701&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Nov 25 17:41:14 2008
@@ -1234,27 +1234,23 @@
new ArrayList<Segment<K, V>>(numSpills);
TaskAttemptID mapId = getTaskID();
for(int i = 0; i < numSpills; i++) {
- IndexRecord indexRecord =
+ final IndexRecord indexRecord =
getIndexInformation(mapId, i, parts);
long segmentOffset = indexRecord.startOffset;
- long rawSegmentLength = indexRecord.rawLength;
long segmentLength = indexRecord.partLength;
- FSDataInputStream in = rfs.open(filename[i]);
- in.seek(segmentOffset);
-
Segment<K, V> s =
- new Segment<K, V>(new Reader<K, V>(job, in, segmentLength, codec),
- true);
+ new Segment<K, V>(job, rfs, filename[i], segmentOffset,
+ segmentLength, codec, true);
segmentList.add(i, s);
if (LOG.isDebugEnabled()) {
+ long rawSegmentLength = indexRecord.rawLength;
LOG.debug("MapId=" + mapId + " Reducer=" + parts +
"Spill =" + i + "(" + segmentOffset + ","+
rawSegmentLength + ", " + segmentLength + ")");
}
- indexRecord = null;
}
//merge
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=720701&r1=720700&r2=720701&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Merger.java Tue Nov 25 17:41:14 2008
@@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -124,17 +125,25 @@
Path file = null;
boolean preserve = false;
CompressionCodec codec = null;
+ long segmentOffset = 0;
long segmentLength = -1;
+
public Segment(Configuration conf, FileSystem fs, Path file,
CompressionCodec codec, boolean preserve) throws IOException {
+ this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve);
+ }
+
+ public Segment(Configuration conf, FileSystem fs, Path file,
+ long segmentOffset, long segmentLength,
+ CompressionCodec codec, boolean preserve) throws IOException {
this.conf = conf;
this.fs = fs;
this.file = file;
this.codec = codec;
this.preserve = preserve;
-
- this.segmentLength = fs.getFileStatus(file).getLen();
+ this.segmentLength = segmentLength;
+ this.segmentOffset = segmentOffset;
}
public Segment(Reader<K, V> reader, boolean preserve) {
@@ -146,7 +155,9 @@
private void init() throws IOException {
if (reader == null) {
- reader = new Reader<K, V>(conf, fs, file, codec);
+ FSDataInputStream in = fs.open(file);
+ in.seek(segmentOffset);
+ reader = new Reader<K, V>(conf, in, segmentLength, codec);
}
}