You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/07/30 00:41:33 UTC

svn commit: r226416 - in /lucene/nutch/branches/mapred/src: java/org/apache/nutch/crawl/ java/org/apache/nutch/io/ java/org/apache/nutch/mapred/ java/org/apache/nutch/mapred/lib/ java/org/apache/nutch/util/ test/org/apache/nutch/fs/

Author: cutting
Date: Fri Jul 29 15:41:07 2005
New Revision: 226416

URL: http://svn.apache.org/viewcvs?rev=226416&view=rev
Log:
Pass a Reporter to Mapper & Reducer so that applications can set status string, plus a few other fixes.

Added:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reporter.java
Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/LinkDb.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputLocation.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityMapper.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityReducer.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/InverseMapper.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/LongSumReducer.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/RegexMapper.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java
    lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java Fri Jul 29 15:41:07 2005
@@ -21,6 +21,7 @@
 import java.io.IOException;
 
 import org.apache.nutch.io.*;
+import org.apache.nutch.util.*;
 import org.apache.nutch.mapred.*;
 
 /** Merge new page entries with existing entries. */
@@ -32,7 +33,8 @@
   }
 
   public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output) throws IOException {
+                     OutputCollector output, Reporter reporter)
+    throws IOException {
 
     CrawlDatum highest = null;
     CrawlDatum old = null;

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java Fri Jul 29 15:41:07 2005
@@ -53,6 +53,7 @@
 
   private RecordReader input;
   private OutputCollector output;
+  private Reporter reporter;
 
   private int activeThreads;
   private int maxRedirect;
@@ -193,21 +194,21 @@
 
   public Fetcher(NutchConf conf) { super(conf); }
 
-  private synchronized void updateStatus(int bytesInPage) {
+  private synchronized void updateStatus(int bytesInPage) throws IOException {
     pages++;
     bytes += bytesInPage;
 
     if ((pages % 100) == 0) {             // show status every 100pp
       long elapsed = (System.currentTimeMillis() - start)/1000;
-      LOG.info( "status: "
-                + pages + " pages, "
-                + errors + " errors, "
-                + bytes + " bytes, "
-                + elapsed + " seconds");
-      LOG.info("status: "
-               + ((float)pages)/elapsed+" pages/s, "
-               + ((((float)bytes)*8)/1024)/elapsed+" kb/s, "
-               + ((float)bytes)/pages+" bytes/page");
+      String line1 =
+        pages+" pages, "+errors+" errors, "+bytes+" bytes, "+elapsed+" secs";
+      String line2 = 
+        + ((float)pages)/elapsed+" pages/s, "
+        + ((((float)bytes)*8)/1024)/elapsed+" kb/s, "
+        + ((float)bytes)/pages+" bytes/page";
+      LOG.info( "status: "+line1);
+      LOG.info( "status: "+line2);
+      reporter.setStatus(line2);
     }
   }
 
@@ -218,11 +219,12 @@
     }
   }
 
-  public void run(RecordReader input, OutputCollector output)
-    throws IOException {
+  public void run(RecordReader input, OutputCollector output,
+                  Reporter reporter) throws IOException {
 
     this.input = input;
     this.output = output;
+    this.reporter = reporter;
 			
     this.maxRedirect = getConf().getInt("http.redirect.max", 3);
     

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java Fri Jul 29 15:41:07 2005
@@ -50,7 +50,8 @@
 
     /** Select & invert subset due for fetch. */
     public void map(WritableComparable key, Writable value,
-                    OutputCollector output) throws IOException {
+                    OutputCollector output, Reporter reporter)
+      throws IOException {
       CrawlDatum crawlDatum = (CrawlDatum)value;
 
       if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_GONE)
@@ -82,7 +83,9 @@
 
     /** Collect until limit is reached. */
     public void reduce(WritableComparable key, Iterator values,
-                       OutputCollector output) throws IOException {
+                       OutputCollector output, Reporter reporter)
+      throws IOException {
+
       while (values.hasNext() && ++count < limit) {
         output.collect(key, (Writable)values.next());
       }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java Fri Jul 29 15:41:07 2005
@@ -124,7 +124,8 @@
   }
 
   public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output) throws IOException {
+                     OutputCollector output, Reporter reporter)
+    throws IOException {
     Inlinks inlinks = null;
     CrawlDatum crawlDatum = null;
     ParseData parseData = null;

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java Fri Jul 29 15:41:07 2005
@@ -43,7 +43,8 @@
     }
 
     public void map(WritableComparable key, Writable val,
-                    OutputCollector output) throws IOException {
+                    OutputCollector output, Reporter reporter)
+      throws IOException {
       UTF8 value = (UTF8)val;
       String url = value.toString();              // value is line of text
       try {
@@ -66,7 +67,8 @@
     public void configure(JobConf job) {}
 
     public void reduce(WritableComparable key, Iterator values,
-                       OutputCollector output) throws IOException {
+                       OutputCollector output, Reporter reporter)
+      throws IOException {
       output.collect(key, (Writable)values.next()); // just collect first value
     }
   }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/LinkDb.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/LinkDb.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/LinkDb.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/LinkDb.java Fri Jul 29 15:41:07 2005
@@ -51,7 +51,8 @@
   }
 
   public void map(WritableComparable key, Writable value,
-                  OutputCollector output) throws IOException {
+                  OutputCollector output, Reporter reporter)
+    throws IOException {
     String fromUrl = key.toString();
     ParseData parseData = (ParseData)value;
     Outlink[] outlinks = parseData.getOutlinks();
@@ -69,7 +70,8 @@
   }
 
   public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output) throws IOException {
+                     OutputCollector output, Reporter reporter)
+    throws IOException {
     Inlinks result = null;
     while (values.hasNext()) {
       Inlinks inlinks = (Inlinks)values.next();

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java Fri Jul 29 15:41:07 2005
@@ -54,7 +54,8 @@
   }
 
   public void map(WritableComparable key, Writable value,
-                  OutputCollector output) throws IOException {
+                  OutputCollector output, Reporter reporter)
+    throws IOException {
     Content content = (Content)value;
 
     Parse parse = null;
@@ -77,7 +78,8 @@
   }
 
   public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output) throws IOException {
+                     OutputCollector output, Reporter reporter)
+    throws IOException {
     output.collect(key, (Writable)values.next()); // collect first value
   }
 

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Fri Jul 29 15:41:07 2005
@@ -76,9 +76,6 @@
       throws IOException {
       this.nfs = nfs;
       this.target = new File(name);
-      if (nfs.exists(target)) {
-        throw new IOException("already exists: " + target);
-      }
       init(new NFSDataOutputStream(nfs.create(target)),
            keyClass, valClass);
     }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java Fri Jul 29 15:41:07 2005
@@ -21,7 +21,7 @@
 import java.util.*;
 
 import org.apache.nutch.io.*;
-import org.apache.nutch.util.NutchConf;
+import org.apache.nutch.util.*;
 
 /** Implements partial value reduction during mapping.  This can minimize the
  * size of intermediate data.  Buffers a list of values for each unique key,
@@ -37,10 +37,13 @@
   private JobConf job;
   private OutputCollector out;
   private Reducer combiner;
+  private Reporter reporter;
 
-  public CombiningCollector(JobConf job, OutputCollector out) {
+  public CombiningCollector(JobConf job, OutputCollector out,
+                            Reporter reporter) {
     this.job = job;
     this.out = out;
+    this.reporter = reporter;
     this.combiner = (Reducer)job.newInstance(job.getCombinerClass());
     this.keyToValues = new TreeMap
       ((Comparator)job.newInstance(job.getOutputKeyComparatorClass()));
@@ -72,7 +75,7 @@
       Map.Entry pair = (Map.Entry)pairs.next();
       combiner.reduce((WritableComparable)pair.getKey(),
                       ((ArrayList)pair.getValue()).iterator(),
-                      out);
+                      out, reporter);
     }
     keyToValues.clear();
     count = 0;

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputLocation.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputLocation.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputLocation.java Fri Jul 29 15:41:07 2005
@@ -61,4 +61,8 @@
     this.port = in.readInt();
   }
 
+  public String toString() {
+    return mapTaskId+"@"+host+":"+port;
+  }
+
 }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java Fri Jul 29 15:41:07 2005
@@ -28,6 +28,6 @@
    * @param input the {@link RecordReader} with input key/value pairs.
    * @param output the {@link OutputCollector} for mapped key/value pairs.
    */
-  void run(RecordReader input, OutputCollector output)
+  void run(RecordReader input, OutputCollector output, Reporter reporter)
     throws IOException;
 }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java Fri Jul 29 15:41:07 2005
@@ -35,7 +35,8 @@
     this.inputValueClass = job.getInputValueClass();
   }
 
-  public void run(RecordReader input, OutputCollector output)
+  public void run(RecordReader input, OutputCollector output,
+                  Reporter reporter)
     throws IOException {
     while (true) {
       // allocate new key & value instances
@@ -48,7 +49,7 @@
         return;
 
       // map pair to output
-      mapper.map(key, value, output);
+      mapper.map(key, value, output, reporter);
     }
   }
 

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java Fri Jul 29 15:41:07 2005
@@ -81,10 +81,11 @@
         };
 
       OutputCollector collector = partCollector;
+      Reporter reporter = getReporter(umbilical, getProgress());
 
       boolean combining = job.getCombinerClass() != null;
       if (combining) {                            // add combining collector
-        collector = new CombiningCollector(job, partCollector);
+        collector = new CombiningCollector(job, partCollector, reporter);
       }
 
       final RecordReader rawIn =                  // open input
@@ -110,7 +111,7 @@
         (MapRunnable)job.newInstance(job.getMapRunnerClass());
 
       try {
-        runner.run(in, collector);                // run the map
+        runner.run(in, collector, reporter);      // run the map
 
         if (combining) {                          // flush combiner
           ((CombiningCollector)collector).flush();

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java Fri Jul 29 15:41:07 2005
@@ -36,6 +36,7 @@
    * @param value the values
    * @param output collects mapped keys and values
    */
-  void map(WritableComparable key, Writable value, OutputCollector output)
+  void map(WritableComparable key, Writable value,
+           OutputCollector output, Reporter reporter)
     throws IOException;
 }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java Fri Jul 29 15:41:07 2005
@@ -33,10 +33,12 @@
   private int partition;
   private boolean sortComplete;
 
-  private Progress copyPhase = getTaskProgress().addPhase();
-  private Progress appendPhase = getTaskProgress().addPhase();
-  private Progress sortPhase  = getTaskProgress().addPhase();
-  private Progress reducePhase = getTaskProgress().addPhase();
+  { getProgress().setStatus("reduce"); }
+
+  private Progress copyPhase = getProgress().addPhase("copy");
+  private Progress appendPhase = getProgress().addPhase("append");
+  private Progress sortPhase  = getProgress().addPhase("sort");
+  private Progress reducePhase = getProgress().addPhase("reduce");
 
   public ReduceTask() {}
 
@@ -165,25 +167,27 @@
       new SequenceFile.Writer(lfs, file, keyClass, valueClass);
     try {
       // append all input files into a single input file
-      WritableComparable key = (WritableComparable)job.newInstance(keyClass);
-      Writable value = (Writable)job.newInstance(valueClass);
-      
       for (int i = 0; i < mapTaskIds.length; i++) {
         appendPhase.addPhase();                 // one per file
       }
       
+      DataOutputBuffer buffer = new DataOutputBuffer();
+
       for (int i = 0; i < mapTaskIds.length; i++) {
         File partFile =
           MapOutputFile.getInputFile(mapTaskIds[i], getTaskId());
         float progPerByte = 1.0f / lfs.getLength(partFile);
         Progress phase = appendPhase.phase();
+        phase.setStatus(partFile.toString());
         SequenceFile.Reader in =
           new SequenceFile.Reader(lfs, partFile.toString());
         try {
-          while(in.next(key, value)) {
-            writer.append(key, value);
+          int keyLen;
+          while((keyLen = in.next(buffer)) > 0) {
+            writer.append(buffer.getData(), 0, buffer.getLength(), keyLen);
             phase.set(in.getPosition()*progPerByte);
             reportProgress(umbilical);
+            buffer.reset();
           }
         } finally {
           in.close();
@@ -247,11 +251,12 @@
     
     // apply reduce function
     SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile);
+    Reporter reporter = getReporter(umbilical, getProgress());
     long length = lfs.getLength(new File(sortedFile));
     try {
       ValuesIterator values = new ValuesIterator(in, length, umbilical);
       while (values.more()) {
-        reducer.reduce(values.getKey(), values, collector);
+        reducer.reduce(values.getKey(), values, collector, reporter);
         values.nextKey();
       }
 

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java Fri Jul 29 15:41:07 2005
@@ -38,7 +38,7 @@
     ReduceTask task = ((ReduceTask)getTask());
     MapOutputFile.removeAll(task.getTaskId());    // cleanup from failures
     String[] mapTaskIds = task.getMapTaskIds();
-    final Progress copyPhase = getTask().getTaskProgress().phase();
+    final Progress copyPhase = getTask().getProgress().phase();
 
     // we need input from every map task
     HashSet needed = new HashSet();
@@ -89,19 +89,17 @@
 
         getTask().reportProgress(getTracker());
         try {
-          LOG.info("Copying "+loc.getMapTaskId()+" from "+addr);
+          copyPhase.phase().setStatus(loc.toString());
           
           client.getFile(loc.getMapTaskId(), task.getTaskId(),
                          new IntWritable(task.getPartition()));
           
           needed.remove(loc.getMapTaskId());     // success: remove from needed
           
-          LOG.info("Copy complete: "+loc.getMapTaskId()+" from "+addr);
-          
           copyPhase.startNextPhase();
           
         } catch (IOException e) {                 // failed: try again later
-          LOG.info("Copy failed: "+loc.getMapTaskId()+" from "+addr);
+          LOG.warning("copy failed: "+loc.getMapTaskId()+" from "+addr);
           
         } finally {
           MapOutputFile.setProgressReporter(null);
@@ -115,7 +113,7 @@
 
   /** Delete all of the temporary map output files. */
   public void close() throws IOException {
-    LOG.info("Task "+getTask()+" done; removing files.");
+    getTask().getProgress().setStatus("closed");
     MapOutputFile.removeAll(getTask().getTaskId());
   }
 

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java Fri Jul 29 15:41:07 2005
@@ -35,6 +35,7 @@
    * @param values the values to combine
    * @param output to collect combined values
    */
-  void reduce(WritableComparable key, Iterator values, OutputCollector output)
+  void reduce(WritableComparable key, Iterator values,
+              OutputCollector output, Reporter reporter)
     throws IOException;
 }

Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reporter.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reporter.java?rev=226416&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reporter.java (added)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reporter.java Fri Jul 29 15:41:07 2005
@@ -0,0 +1,28 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.mapred;
+
+import java.io.IOException;
+
+/** Passed to application code to permit alteration of status. */
+public interface Reporter {
+  /** Alter the application's status description.
+   *
+   * @param status a brief description of the current status
+   */
+  void setStatus(String status) throws IOException;
+}

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java Fri Jul 29 15:41:07 2005
@@ -83,7 +83,17 @@
   private transient long nextProgressTime =
     System.currentTimeMillis() + PROGRESS_INTERVAL;
 
-  public Progress getTaskProgress() { return taskProgress; }
+  public Progress getProgress() { return taskProgress; }
+
+  public Reporter getReporter(final TaskUmbilicalProtocol umbilical,
+                              final Progress progress) throws IOException {
+    return new Reporter() {
+        public void setStatus(String status) throws IOException {
+          progress.setStatus(status);
+          reportProgress(umbilical);
+        }
+      };
+  }
 
   public void reportProgress(TaskUmbilicalProtocol umbilical, float progress)
     throws IOException {
@@ -97,7 +107,9 @@
     if (now > nextProgressTime)  {
       synchronized (this) {
         nextProgressTime = now + PROGRESS_INTERVAL;
-        umbilical.progress(getTaskId(), taskProgress.get(), "");
+        float progress = taskProgress.get();
+        String status = taskProgress.toString();
+        umbilical.progress(getTaskId(), progress, status);
       }
     }
   }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Fri Jul 29 15:41:07 2005
@@ -355,8 +355,8 @@
         /**
          * The task is reporting its progress
          */
-        public synchronized void reportProgress(float p) {
-            LOG.info("Progress for task " + task.getTaskId() + " is " + p);
+        public synchronized void reportProgress(float p, String state) {
+            LOG.info(task.getTaskId()+" "+p+"% "+state);
             this.progress = p;
             this.runstate = TaskStatus.RUNNING;
         }
@@ -469,7 +469,7 @@
      */
     public void progress(String taskid, float progress, String state) throws IOException {
         TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
-        tip.reportProgress(progress);
+        tip.reportProgress(progress, state);
     }
 
     /**

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityMapper.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityMapper.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityMapper.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityMapper.java Fri Jul 29 15:41:07 2005
@@ -21,6 +21,7 @@
 import org.apache.nutch.mapred.Mapper;
 import org.apache.nutch.mapred.OutputCollector;
 import org.apache.nutch.mapred.JobConf;
+import org.apache.nutch.mapred.Reporter;
 
 import org.apache.nutch.io.Writable;
 import org.apache.nutch.io.WritableComparable;
@@ -33,7 +34,8 @@
   /** The identify function.  Input key/value pair is written directly to
    * output.*/
   public void map(WritableComparable key, Writable val,
-                  OutputCollector output) throws IOException {
+                  OutputCollector output, Reporter reporter)
+    throws IOException {
     output.collect(key, val);
   }
 

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityReducer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityReducer.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityReducer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityReducer.java Fri Jul 29 15:41:07 2005
@@ -23,6 +23,7 @@
 import org.apache.nutch.mapred.Reducer;
 import org.apache.nutch.mapred.OutputCollector;
 import org.apache.nutch.mapred.JobConf;
+import org.apache.nutch.mapred.Reporter;
 
 import org.apache.nutch.io.Writable;
 import org.apache.nutch.io.WritableComparable;
@@ -33,8 +34,9 @@
   public void configure(JobConf job) {}
 
   /** Writes all keys and values directly to output. */
-  public void reduce (WritableComparable key, Iterator values,
-                      OutputCollector output) throws IOException {
+  public void reduce(WritableComparable key, Iterator values,
+                     OutputCollector output, Reporter reporter)
+    throws IOException {
     while (values.hasNext()) {
       output.collect(key, (Writable)values.next());
     }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/InverseMapper.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/InverseMapper.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/InverseMapper.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/InverseMapper.java Fri Jul 29 15:41:07 2005
@@ -21,10 +21,12 @@
 import org.apache.nutch.mapred.Mapper;
 import org.apache.nutch.mapred.OutputCollector;
 import org.apache.nutch.mapred.JobConf;
+import org.apache.nutch.mapred.Reporter;
 
 import org.apache.nutch.io.WritableComparable;
 import org.apache.nutch.io.Writable;
 
+
 /** A {@link Mapper} that swaps keys and values. */
 public class InverseMapper implements Mapper {
 
@@ -32,7 +34,8 @@
 
   /** The inverse function.  Input keys and values are swapped.*/
   public void map(WritableComparable key, Writable value,
-                  OutputCollector output) throws IOException {
+                  OutputCollector output, Reporter reporter)
+    throws IOException {
     output.collect((WritableComparable)value, key);
   }
 }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/LongSumReducer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/LongSumReducer.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/LongSumReducer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/LongSumReducer.java Fri Jul 29 15:41:07 2005
@@ -22,19 +22,21 @@
 import org.apache.nutch.mapred.Reducer;
 import org.apache.nutch.mapred.OutputCollector;
 import org.apache.nutch.mapred.JobConf;
+import org.apache.nutch.mapred.Reporter;
+
 
 import org.apache.nutch.io.Writable;
 import org.apache.nutch.io.WritableComparable;
 import org.apache.nutch.io.LongWritable;
 
-
 /** A {@link Reducer} that sums long values. */
 public class LongSumReducer implements Reducer {
 
   public void configure(JobConf job) {}
 
   public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output) throws IOException {
+                     OutputCollector output, Reporter reporter)
+    throws IOException {
 
     // sum all values for this key
     long sum = 0;

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/RegexMapper.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/RegexMapper.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/RegexMapper.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/RegexMapper.java Fri Jul 29 15:41:07 2005
@@ -21,12 +21,14 @@
 import org.apache.nutch.mapred.Mapper;
 import org.apache.nutch.mapred.OutputCollector;
 import org.apache.nutch.mapred.JobConf;
+import org.apache.nutch.mapred.Reporter;
 
 import org.apache.nutch.io.WritableComparable;
 import org.apache.nutch.io.Writable;
 import org.apache.nutch.io.LongWritable;
 import org.apache.nutch.io.UTF8;
 
+
 import java.util.regex.Pattern;
 import java.util.regex.Matcher;
 
@@ -43,7 +45,8 @@
   }
 
   public void map(WritableComparable key, Writable value,
-                  OutputCollector output) throws IOException {
+                  OutputCollector output, Reporter reporter)
+    throws IOException {
     String text = ((UTF8)value).toString();
     Matcher matcher = pattern.matcher(text);
     while (matcher.find()) {

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java Fri Jul 29 15:41:07 2005
@@ -22,12 +22,14 @@
 import org.apache.nutch.mapred.Mapper;
 import org.apache.nutch.mapred.OutputCollector;
 import org.apache.nutch.mapred.JobConf;
+import org.apache.nutch.mapred.Reporter;
 
 import org.apache.nutch.io.WritableComparable;
 import org.apache.nutch.io.Writable;
 import org.apache.nutch.io.LongWritable;
 import org.apache.nutch.io.UTF8;
 
+
 /** A {@link Mapper} that maps text values into <token,freq> pairs.  Uses
  * {@link StringTokenizer} to break text into tokens. */
 public class TokenCountMapper implements Mapper {
@@ -35,7 +37,8 @@
   public void configure(JobConf job) {}
 
   public void map(WritableComparable key, Writable value,
-                  OutputCollector output) throws IOException {
+                  OutputCollector output, Reporter reporter)
+    throws IOException {
     // get input text
     long position = ((LongWritable)key).get();    // key is position in file
     String text = ((UTF8)value).toString();       // value is line of text

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java Fri Jul 29 15:41:07 2005
@@ -24,6 +24,7 @@
  * sub-phases are created by calling {@link #addPhase()}.
  */
 public class Progress {
+  private String status = "";
   private float progress;
   private int currentPhase;
   private ArrayList phases = new ArrayList();
@@ -33,6 +34,13 @@
   /** Creates a new root node. */
   public Progress() {}
 
+  /** Adds a named node to the tree. */
+  public Progress addPhase(String status) {
+    Progress phase = addPhase();
+    phase.setStatus(status);
+    return phase;
+  }
+
   /** Adds a node to the tree. */
   public Progress addPhase() {
     Progress phase = new Progress();
@@ -79,9 +87,30 @@
   private float getInternal() {
     int phaseCount = phases.size();
     if (phaseCount != 0) {
-      return progressPerPhase*(currentPhase + phase().getInternal());
+      float subProgress =
+        currentPhase < phaseCount ? phase().getInternal() : 0.0f;
+      return progressPerPhase*(currentPhase + subProgress);
     } else {
       return progress;
     }
   }
+
+  public void setStatus(String status) {
+    this.status = status;
+  }
+
+  public String toString() {
+    StringBuffer result = new StringBuffer();
+    toString(result);
+    return result.toString();
+  }
+
+  private void toString(StringBuffer buffer) {
+    buffer.append(status);
+    if (phases.size() != 0 && currentPhase < phases.size()) {
+      buffer.append(" > ");
+      phase().toString(buffer);
+    }
+  }
+
 }

Modified: lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java (original)
+++ lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java Fri Jul 29 15:41:07 2005
@@ -116,13 +116,14 @@
     }
 
     public void map(WritableComparable key, Writable value,
-                    OutputCollector collector) throws IOException {
+                    OutputCollector collector, Reporter reporter)
+      throws IOException {
       String name = ((UTF8)key).toString();
       long size = ((LongWritable)value).get();
       long seed = Long.parseLong(name);
 
       random.setSeed(seed);
-      //LOG.info("writing: name="+name+" size="+size);
+      reporter.setStatus("creating " + name);
 
       OutputStream out = fs.create(new File(DATA_DIR, name));
 
@@ -134,12 +135,15 @@
           int length = (remains<=buffer.length) ? (int)remains : buffer.length;
           out.write(buffer, 0, length);
           written += length;
+          reporter.setStatus("writing "+name+"@"+written+"/"+size);
         }
       } finally {
         out.close();
       }
 
       collector.collect(new UTF8("bytes"), new LongWritable(written));
+
+      reporter.setStatus("wrote " + name);
     }
   }
 
@@ -188,36 +192,41 @@
     }
 
     public void map(WritableComparable key, Writable value,
-                    OutputCollector collector) throws IOException {
+                    OutputCollector collector, Reporter reporter)
+      throws IOException {
       String name = ((UTF8)key).toString();
       long size = ((LongWritable)value).get();
       long seed = Long.parseLong(name);
 
       random.setSeed(seed);
-      //LOG.info("reading: name="+name+" size="+size);
+      reporter.setStatus("opening " + name);
 
-      InputStream in = fs.open(new File(DATA_DIR, name));
+      DataInputStream in =
+        new DataInputStream(fs.open(new File(DATA_DIR, name)));
 
       long read = 0;
       try {
         while (read < size) {
           long remains = size - read;
-          int req = (remains<=buffer.length) ? (int)remains : buffer.length;
-          int got = in.read(buffer, 0, req);
-          read += got;
-          assertEquals(got, req);
+          int n = (remains<=buffer.length) ? (int)remains : buffer.length;
+          in.readFully(buffer, 0, n);
+          read += n;
           random.nextBytes(check);
-          if (got != buffer.length) {
-            Arrays.fill(buffer, got, buffer.length, (byte)0);
-            Arrays.fill(check, got, check.length, (byte)0);
+          if (n != buffer.length) {
+            Arrays.fill(buffer, n, buffer.length, (byte)0);
+            Arrays.fill(check, n, check.length, (byte)0);
           }
           assertTrue(Arrays.equals(buffer, check));
+          reporter.setStatus("reading "+name+"@"+read+"/"+size);
+
         }
       } finally {
         in.close();
       }
 
       collector.collect(new UTF8("bytes"), new LongWritable(read));
+
+      reporter.setStatus("read " + name);
     }
   }