You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/07/28 20:15:23 UTC
svn commit: r225837 - in
/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred:
MapOutputFile.java ReduceTask.java TaskRunner.java
Author: cutting
Date: Thu Jul 28 11:15:22 2005
New Revision: 225837
URL: http://svn.apache.org/viewcvs?rev=225837&view=rev
Log:
Improved progress reporting during reduce.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskRunner.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java?rev=225837&r1=225836&r2=225837&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java Thu Jul 28 11:15:22 2005
@@ -112,6 +112,7 @@
// read the length-prefixed file content into a local file
File file = getInputFile(mapTaskId, reduceTaskId);
long length = in.readLong();
+ float progPerByte = 1.0f / length;
long unread = length;
file.getParentFile().mkdirs(); // make directory
OutputStream out = new FileOutputStream(file);
@@ -123,7 +124,7 @@
out.write(buffer, 0, bytesToRead);
unread -= bytesToRead;
if (reporter != null) {
- reporter.progress(length-unread/(float)length);
+ reporter.progress((length-unread)*progPerByte);
}
}
} finally {
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=225837&r1=225836&r2=225837&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java Thu Jul 28 11:15:22 2005
@@ -158,6 +158,45 @@
copyPhase.complete(); // copy is already complete
+ // open a file to collect map output
+ File taskDir = new File(LOCAL_DIR, getTaskId());
+ String file = new File(taskDir, "all.in").toString();
+ SequenceFile.Writer writer =
+ new SequenceFile.Writer(lfs, file, keyClass, valueClass);
+ try {
+ // append all input files into a single input file
+ WritableComparable key = (WritableComparable)job.newInstance(keyClass);
+ Writable value = (Writable)job.newInstance(valueClass);
+
+ for (int i = 0; i < mapTaskIds.length; i++) {
+ appendPhase.addPhase(); // one per file
+ }
+
+ for (int i = 0; i < mapTaskIds.length; i++) {
+ File partFile =
+ MapOutputFile.getInputFile(mapTaskIds[i], getTaskId());
+ float progPerByte = 1.0f / lfs.getLength(partFile);
+ Progress phase = appendPhase.phase();
+ SequenceFile.Reader in =
+ new SequenceFile.Reader(lfs, partFile.toString());
+ try {
+ while(in.next(key, value)) {
+ writer.append(key, value);
+ phase.set(in.getPosition()*progPerByte);
+ reportProgress(umbilical);
+ }
+ } finally {
+ in.close();
+ }
+ phase.complete();
+ }
+
+ } finally {
+ writer.close();
+ }
+
+ appendPhase.complete(); // append is complete
+
// spawn a thread to give sort progress heartbeats
Thread sortProgress = new Thread() {
public void run() {
@@ -175,44 +214,10 @@
};
sortProgress.setName("Sort progress reporter for task "+getTaskId());
- File taskDir = new File(LOCAL_DIR, getTaskId());
- String file = new File(taskDir, "all.in").toString();
String sortedFile = file+".sorted";
try {
sortProgress.start();
-
- // open a file to collect map output
- SequenceFile.Writer writer =
- new SequenceFile.Writer(lfs, file, keyClass, valueClass);
- try {
- // append all input files into a single input file
- WritableComparable key = (WritableComparable)job.newInstance(keyClass);
- Writable value = (Writable)job.newInstance(valueClass);
-
- for (int i = 0; i < mapTaskIds.length; i++) {
- appendPhase.addPhase(); // one per file
- }
-
- for (int i = 0; i < mapTaskIds.length; i++) {
- String partFile =
- MapOutputFile.getInputFile(mapTaskIds[i], getTaskId()).toString();
- SequenceFile.Reader in = new SequenceFile.Reader(lfs, partFile);
- try {
- while(in.next(key, value)) {
- writer.append(key, value);
- }
- } finally {
- in.close();
- }
- appendPhase.startNextPhase();
- }
-
- } finally {
- writer.close();
- }
-
- appendPhase.complete(); // append is complete
// sort the input file
WritableComparator comparator =
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskRunner.java?rev=225837&r1=225836&r2=225837&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskRunner.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskRunner.java Thu Jul 28 11:15:22 2005
@@ -90,6 +90,7 @@
// run java
runChild(new String[] {
jvm.toString(),
+ //"-Xrunhprof:cpu=samples,file="+t.getTaskId()+".prof",
"-Xmx"+job.get("mapred.child.heap.size", "200m"),
"-cp", classPath.toString(),
TaskTracker.Child.class.getName(), // main is Child