You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/12/02 20:11:44 UTC
svn commit: r722574 - in /hadoop/core/branches/branch-0.19: CHANGES.txt
conf/hadoop-default.xml src/mapred/org/apache/hadoop/mapred/MapTask.java
src/mapred/org/apache/hadoop/mapred/Merger.java
src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Author: cdouglas
Date: Tue Dec 2 11:11:44 2008
New Revision: 722574
URL: http://svn.apache.org/viewvc?rev=722574&view=rev
Log:
HADOOP-4714. Report status between merges and make the number of records
between progress reports configurable. Contributed by Jothi Padmanabhan.
Modified:
hadoop/core/branches/branch-0.19/CHANGES.txt
hadoop/core/branches/branch-0.19/conf/hadoop-default.xml
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Merger.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=722574&r1=722573&r2=722574&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Tue Dec 2 11:11:44 2008
@@ -1042,6 +1042,9 @@
HADOOP-4635. Fix a memory leak in fuse dfs. (pete wyckoff via mahadev)
+ HADOOP-4714. Report status between merges and make the number of records
+ between progress reports configurable. (Jothi Padmanabhan via cdouglas)
+
Release 0.18.2 - 2008-11-03
BUG FIXES
Modified: hadoop/core/branches/branch-0.19/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/conf/hadoop-default.xml?rev=722574&r1=722573&r2=722574&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/conf/hadoop-default.xml (original)
+++ hadoop/core/branches/branch-0.19/conf/hadoop-default.xml Tue Dec 2 11:11:44 2008
@@ -1531,4 +1531,12 @@
</description>
</property>
+<property>
+ <name>mapred.merge.recordsBeforeProgress</name>
+ <value>10000</value>
+ <description> The number of records to process during merge before
+ sending a progress notification to the TaskTracker.
+ </description>
+</property>
+
</configuration>
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=722574&r1=722573&r2=722574&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Dec 2 11:11:44 2008
@@ -1267,7 +1267,7 @@
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
if (null == combinerClass || numSpills < minSpillsForCombine) {
- Merger.writeFile(kvIter, writer, reporter);
+ Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
combineAndSpill(kvIter, combineInputCounter);
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=722574&r1=722573&r2=722574&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Merger.java Tue Dec 2 11:11:44 2008
@@ -43,8 +43,6 @@
class Merger {
private static final Log LOG = LogFactory.getLog(Merger.class);
- private static final long PROGRESS_BAR = 10000;
-
// Local directories
private static LocalDirAllocator lDirAlloc =
new LocalDirAllocator("mapred.local.dir");
@@ -103,13 +101,16 @@
public static <K extends Object, V extends Object>
void writeFile(RawKeyValueIterator records, Writer<K, V> writer,
- Progressable progressable)
+ Progressable progressable, Configuration conf)
throws IOException {
long recordCtr = 0;
+ long progressBar = conf.getLong("mapred.merge.recordsBeforeProgress",
+ 10000);
+
while(records.next()) {
writer.append(records.getKey(), records.getValue());
- if ((++recordCtr % PROGRESS_BAR) == 0) {
+ if ((recordCtr++ % progressBar) == 0) {
progressable.progress();
}
}
@@ -429,7 +430,7 @@
Writer<K, V> writer =
new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec);
- writeFile(this, writer, reporter);
+ writeFile(this, writer, reporter, conf);
writer.close();
//we finished one single level merge; now clean up the priority
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=722574&r1=722573&r2=722574&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Dec 2 11:11:44 2008
@@ -2084,7 +2084,7 @@
final Writer writer = new Writer(job, fs, outputPath,
keyClass, valueClass, codec);
try {
- Merger.writeFile(rIter, writer, reporter);
+ Merger.writeFile(rIter, writer, reporter, job);
addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
} catch (Exception e) {
if (null != outputPath) {
@@ -2395,7 +2395,7 @@
true, ioSortFactor, tmpDir,
conf.getOutputKeyComparator(), reporter);
- Merger.writeFile(iter, writer, reporter);
+ Merger.writeFile(iter, writer, reporter, conf);
writer.close();
} catch (Exception e) {
localFileSys.delete(outputPath, true);
@@ -2493,7 +2493,7 @@
conf.getOutputKeyComparator(), reporter);
if (null == combinerClass) {
- Merger.writeFile(rIter, writer, reporter);
+ Merger.writeFile(rIter, writer, reporter, conf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);