You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tu...@apache.org on 2013/01/28 19:58:35 UTC
svn commit: r1439561 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoo...
Author: tucu
Date: Mon Jan 28 18:58:34 2013
New Revision: 1439561
URL: http://svn.apache.org/viewvc?rev=1439561&view=rev
Log:
Revering MAPREDUCE-2264
Removed:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMerger.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1439561&r1=1439560&r2=1439561&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Jan 28 18:58:34 2013
@@ -269,9 +269,6 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4948. Fix a failing unit test TestYARNRunner.testHistoryServerToken.
(Junping Du via sseth)
- MAPREDUCE-2264. Job status exceeds 100% in some cases.
- (devaraj.k and sandyr via tucu)
-
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1439561&r1=1439560&r2=1439561&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Mon Jan 28 18:58:34 2013
@@ -218,7 +218,6 @@ public class Merger {
CompressionCodec codec = null;
long segmentOffset = 0;
long segmentLength = -1;
- long rawDataLength = -1;
Counters.Counter mapOutputsCounter = null;
@@ -235,15 +234,6 @@ public class Merger {
this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve,
mergedMapOutputsCounter);
}
-
- public Segment(Configuration conf, FileSystem fs, Path file,
- CompressionCodec codec, boolean preserve,
- Counters.Counter mergedMapOutputsCounter, long rawDataLength)
- throws IOException {
- this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve,
- mergedMapOutputsCounter);
- this.rawDataLength = rawDataLength;
- }
public Segment(Configuration conf, FileSystem fs, Path file,
long segmentOffset, long segmentLength,
@@ -271,11 +261,6 @@ public class Merger {
public Segment(Reader<K, V> reader, boolean preserve) {
this(reader, preserve, null);
}
-
- public Segment(Reader<K, V> reader, boolean preserve, long rawDataLength) {
- this(reader, preserve, null);
- this.rawDataLength = rawDataLength;
- }
public Segment(Reader<K, V> reader, boolean preserve,
Counters.Counter mapOutputsCounter) {
@@ -315,10 +300,6 @@ public class Merger {
segmentLength : reader.getLength();
}
- public long getRawDataLength() {
- return (rawDataLength > 0) ? rawDataLength : getLength();
- }
-
boolean nextRawKey() throws IOException {
return reader.nextRawKey(key);
}
@@ -652,7 +633,7 @@ public class Merger {
totalBytesProcessed = 0;
totalBytes = 0;
for (int i = 0; i < segmentsToMerge.size(); i++) {
- totalBytes += segmentsToMerge.get(i).getRawDataLength();
+ totalBytes += segmentsToMerge.get(i).getLength();
}
}
if (totalBytes != 0) //being paranoid
@@ -721,7 +702,7 @@ public class Merger {
// size will match(almost) if combiner is not called in merge.
long inputBytesOfThisMerge = totalBytesProcessed -
bytesProcessedInPrevMerges;
- totalBytes -= inputBytesOfThisMerge - tempSegment.getRawDataLength();
+ totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
if (totalBytes != 0) {
progPerByte = 1.0f / (float)totalBytes;
}
@@ -787,7 +768,7 @@ public class Merger {
for (int i = 0; i < numSegments; i++) {
// Not handling empty segments here assuming that it would not affect
// much in calculation of mergeProgress.
- segmentSizes.add(segments.get(i).getRawDataLength());
+ segmentSizes.add(segments.get(i).getLength());
}
// If includeFinalMerge is true, allow the following while loop iterate
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1439561&r1=1439560&r2=1439561&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Mon Jan 28 18:58:34 2013
@@ -89,7 +89,7 @@ public class MergeManagerImpl<K, V> impl
new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>());
private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger;
- Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>();
+ Set<Path> onDiskMapOutputs = new TreeSet<Path>();
private final OnDiskMerger onDiskMerger;
private final long memoryLimit;
@@ -336,7 +336,7 @@ public class MergeManagerImpl<K, V> impl
inMemoryMergedMapOutputs.size());
}
- public synchronized void closeOnDiskFile(CompressAwarePath file) {
+ public synchronized void closeOnDiskFile(Path file) {
onDiskMapOutputs.add(file);
if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
@@ -356,7 +356,7 @@ public class MergeManagerImpl<K, V> impl
List<InMemoryMapOutput<K, V>> memory =
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
memory.addAll(inMemoryMapOutputs);
- List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
+ List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
return finalMerge(jobConf, rfs, memory, disk);
}
@@ -456,7 +456,6 @@ public class MergeManagerImpl<K, V> impl
codec, null);
RawKeyValueIterator rIter = null;
- CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
@@ -475,8 +474,6 @@ public class MergeManagerImpl<K, V> impl
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
- compressAwarePath = new CompressAwarePath(outputPath,
- writer.getRawLength());
writer.close();
LOG.info(reduceId +
@@ -492,12 +489,12 @@ public class MergeManagerImpl<K, V> impl
}
// Note the output of the merge
- closeOnDiskFile(compressAwarePath);
+ closeOnDiskFile(outputPath);
}
}
- private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> {
+ private class OnDiskMerger extends MergeThread<Path,K,V> {
public OnDiskMerger(MergeManagerImpl<K, V> manager) {
super(manager, Integer.MAX_VALUE, exceptionReporter);
@@ -506,7 +503,7 @@ public class MergeManagerImpl<K, V> impl
}
@Override
- public void merge(List<CompressAwarePath> inputs) throws IOException {
+ public void merge(List<Path> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
@@ -521,8 +518,8 @@ public class MergeManagerImpl<K, V> impl
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
- for (CompressAwarePath file : inputs) {
- approxOutputSize += localFS.getFileStatus(file.getPath()).getLen();
+ for (Path file : inputs) {
+ approxOutputSize += localFS.getFileStatus(file).getLen();
}
// add the checksum length
@@ -539,7 +536,6 @@ public class MergeManagerImpl<K, V> impl
(Class<V>) jobConf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator iter = null;
- CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
@@ -552,15 +548,13 @@ public class MergeManagerImpl<K, V> impl
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
- compressAwarePath = new CompressAwarePath(outputPath,
- writer.getRawLength());
writer.close();
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
- closeOnDiskFile(compressAwarePath);
+ closeOnDiskFile(outputPath);
LOG.info(reduceId +
" Finished merging " + inputs.size() +
@@ -659,7 +653,7 @@ public class MergeManagerImpl<K, V> impl
private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
- List<CompressAwarePath> onDiskMapOutputs
+ List<Path> onDiskMapOutputs
) throws IOException {
LOG.info("finalMerge called with " +
inMemoryMapOutputs.size() + " in-memory map-outputs and " +
@@ -718,8 +712,7 @@ public class MergeManagerImpl<K, V> impl
try {
Merger.writeFile(rIter, writer, reporter, job);
// add to list of final disk outputs.
- onDiskMapOutputs.add(new CompressAwarePath(outputPath,
- writer.getRawLength()));
+ onDiskMapOutputs.add(outputPath);
} catch (IOException e) {
if (null != outputPath) {
try {
@@ -749,19 +742,15 @@ public class MergeManagerImpl<K, V> impl
// segments on disk
List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
long onDiskBytes = inMemToDiskBytes;
- long rawBytes = inMemToDiskBytes;
- CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(
- new CompressAwarePath[onDiskMapOutputs.size()]);
- for (CompressAwarePath file : onDisk) {
- long fileLength = fs.getFileStatus(file.getPath()).getLen();
- onDiskBytes += fileLength;
- rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;
-
- LOG.debug("Disk file: " + file + " Length is " + fileLength);
- diskSegments.add(new Segment<K, V>(job, fs, file.getPath(), codec, keepInputs,
+ Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
+ for (Path file : onDisk) {
+ onDiskBytes += fs.getFileStatus(file).getLen();
+ LOG.debug("Disk file: " + file + " Length is " +
+ fs.getFileStatus(file).getLen());
+ diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
(file.toString().endsWith(
Task.MERGED_OUTPUT_PREFIX) ?
- null : mergedMapOutputsCounter), file.getRawDataLength()
+ null : mergedMapOutputsCounter)
));
}
LOG.info("Merging " + onDisk.length + " files, " +
@@ -797,7 +786,7 @@ public class MergeManagerImpl<K, V> impl
return diskMerge;
}
finalSegments.add(new Segment<K,V>(
- new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));
+ new RawKVIteratorReader(diskMerge, onDiskBytes), true));
}
return Merger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
@@ -805,24 +794,4 @@ public class MergeManagerImpl<K, V> impl
null);
}
-
- static class CompressAwarePath
- {
- private long rawDataLength;
-
- private Path path;
-
- public CompressAwarePath(Path path, long rawDataLength) {
- this.path = path;
- this.rawDataLength = rawDataLength;
- }
-
- public long getRawDataLength() {
- return rawDataLength;
- }
-
- public Path getPath() {
- return path;
- }
- }
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1439561&r1=1439560&r2=1439561&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java Mon Jan 28 18:58:34 2013
@@ -37,7 +37,6 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -113,9 +112,7 @@ class OnDiskMapOutput<K, V> extends MapO
@Override
public void commit() throws IOException {
localFS.rename(tmpOutputPath, outputPath);
- CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
- getSize());
- merger.closeOnDiskFile(compressAwarePath);
+ merger.closeOnDiskFile(outputPath);
}
@Override