You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/07/28 00:46:54 UTC

svn commit: r225647 - in /lucene/nutch/branches/mapred/src/java/org/apache/nutch: mapred/MapOutputFile.java mapred/MapTask.java mapred/ReduceTask.java mapred/ReduceTaskRunner.java mapred/Task.java util/Progress.java

Author: cutting
Date: Wed Jul 27 15:46:48 2005
New Revision: 225647

URL: http://svn.apache.org/viewcvs?rev=225647&view=rev
Log:
Report progress every second, rather than every 100th of the task, so that progress can be used as a heartbeat, to tell if task is hung.

Added:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java
Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java?rev=225647&r1=225646&r2=225647&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java Wed Jul 27 15:46:48 2005
@@ -31,6 +31,17 @@
   private String reduceTaskId;
   private int partition;
   
+  /** Permits reporting of file copy progress. */
+  public static interface ProgressReporter {
+    void progress(float progress) throws IOException;
+  }
+
+  private static final ThreadLocal REPORTERS = new ThreadLocal();
+  
+  public static void setProgressReporter(ProgressReporter reporter) {
+    REPORTERS.set(reporter);
+  }
+
   /** Create a local map output file name.
    * @param mapTaskId a map task id
    * @param partition a reduce partition
@@ -96,18 +107,24 @@
     this.reduceTaskId = UTF8.readString(in);
     this.partition = in.readInt();
 
+    ProgressReporter reporter = (ProgressReporter)REPORTERS.get();
+
     // read the length-prefixed file content into a local file
     File file = getInputFile(mapTaskId, reduceTaskId);
     long length = in.readLong();
+    long unread = length;
     file.getParentFile().mkdirs();                // make directory
     OutputStream out = new FileOutputStream(file);
     try {
       byte[] buffer = new byte[8192];
-      while (length > 0) {
-          int bytesToRead = Math.min((int) length, buffer.length);
+      while (unread > 0) {
+          int bytesToRead = Math.min((int) unread, buffer.length);
           in.readFully(buffer, 0, bytesToRead);
           out.write(buffer, 0, bytesToRead);
-          length -= bytesToRead;
+          unread -= bytesToRead;
+          if (reporter != null) {
+            reporter.progress(length-unread/(float)length);
+          }
       }
     } finally {
       out.close();

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java?rev=225647&r1=225646&r2=225647&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java Wed Jul 27 15:46:48 2005
@@ -91,18 +91,14 @@
         job.getInputFormat().getRecordReader(NutchFileSystem.get(),split,job);
 
       RecordReader in = new RecordReader() {      // wrap in progress reporter
-          private float end = (float)split.getLength();
-          private float lastProgress = 0.0f;
+          private float perByte = 1.0f /(float)split.getLength();
 
           public synchronized boolean next(Writable key, Writable value)
             throws IOException {
 
             float progress =                        // compute progress
-              (float)Math.min((rawIn.getPos()-split.getStart())/end, 1.0f);
-            if ((progress - lastProgress) > 0.01f)  { // 100 progress reports
-              umbilical.progress(getTaskId(), new FloatWritable(progress));
-              lastProgress = progress;
-            }
+              (float)Math.min((rawIn.getPos()-split.getStart())*perByte, 1.0f);
+            reportProgress(umbilical, progress);
 
             return rawIn.next(key, value);
           }
@@ -120,7 +116,7 @@
           ((CombiningCollector)collector).flush();
         }
 
-        umbilical.progress(getTaskId(), new FloatWritable(1.0f)); // done
+        reportProgress(umbilical, 1.0f);          // done
 
       } finally {
         in.close();                               // close input

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=225647&r1=225646&r2=225647&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java Wed Jul 27 15:46:48 2005
@@ -31,6 +31,12 @@
 
   private String[] mapTaskIds;
   private int partition;
+  private boolean sortComplete;
+
+  private Progress copyPhase = getTaskProgress().addPhase();
+  private Progress appendPhase = getTaskProgress().addPhase();
+  private Progress sortPhase  = getTaskProgress().addPhase();
+  private Progress reducePhase = getTaskProgress().addPhase();
 
   public ReduceTask() {}
 
@@ -71,15 +77,21 @@
   }
 
   /** Iterates values while keys match in sorted input. */
-  private static class ValuesIterator implements Iterator {
+  private class ValuesIterator implements Iterator {
     private SequenceFile.Reader in;               // input file
     private WritableComparable key;               // current key
     private Writable value;                       // current value
     private boolean hasNext;                      // more w/ this key
     private boolean more;                         // more in file
+    private float progPerByte;
+    private TaskUmbilicalProtocol umbilical;
 
-    public ValuesIterator (SequenceFile.Reader in) throws IOException {
+    public ValuesIterator (SequenceFile.Reader in, long length,
+                           TaskUmbilicalProtocol umbilical)
+      throws IOException {
       this.in = in;
+      this.progPerByte = 1.0f / (float)length;
+      this.umbilical = umbilical;
       getNext();
     }
 
@@ -114,6 +126,9 @@
     public WritableComparable getKey() { return key; }
 
     private void getNext() throws IOException {
+      reducePhase.set(in.getPosition()*progPerByte); // update progress
+      reportProgress(umbilical);
+
       Writable lastKey = key;                     // save previous key
       try {
         key = (WritableComparable)in.getKeyClass().newInstance();
@@ -134,56 +149,85 @@
     }
   }
 
-  public void run(JobConf job, TaskUmbilicalProtocol umbilical)
+  public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
     Class keyClass = job.getOutputKeyClass();
     Class valueClass = job.getOutputValueClass();
     Reducer reducer = (Reducer)job.newInstance(job.getReducerClass());
-        
-    umbilical.progress(getTaskId(), new FloatWritable(1.0f/3.0f));
-
-    // open a file to collect map output
     NutchFileSystem lfs = NutchFileSystem.getNamed("local");
+
+    copyPhase.complete();                         // copy is already complete
+
+    // spawn a thread to give sort progress heartbeats
+    Thread sortProgress = new Thread() {
+        public void run() {
+          while (!sortComplete) {
+            try {
+              reportProgress(umbilical);
+              Thread.sleep(PROGRESS_INTERVAL);
+            } catch (InterruptedException e) {
+              continue;
+            } catch (Throwable e) {
+              return;
+            }
+          }
+        }
+      };
+    sortProgress.setName("Sort progress reporter for task "+getTaskId());
+
     File taskDir = new File(LOCAL_DIR, getTaskId());
     String file = new File(taskDir, "all.in").toString();
-    SequenceFile.Writer writer =
-      new SequenceFile.Writer(lfs, file, keyClass, valueClass);
+    String sortedFile = file+".sorted";
+
     try {
-      // append all input files into a single input file
-      WritableComparable key = (WritableComparable)job.newInstance(keyClass);
-      Writable value = (Writable)job.newInstance(valueClass);
-
-      for (int i = 0; i < mapTaskIds.length; i++) {
-        String partFile =
-          MapOutputFile.getInputFile(mapTaskIds[i], getTaskId()).toString();
-        SequenceFile.Reader in = new SequenceFile.Reader(lfs, partFile);
-        try {
-          while(in.next(key, value)) {
-            writer.append(key, value);
+      sortProgress.start();
+
+      // open a file to collect map output
+      SequenceFile.Writer writer =
+        new SequenceFile.Writer(lfs, file, keyClass, valueClass);
+      try {
+        // append all input files into a single input file
+        WritableComparable key = (WritableComparable)job.newInstance(keyClass);
+        Writable value = (Writable)job.newInstance(valueClass);
+
+        for (int i = 0; i < mapTaskIds.length; i++) {
+          appendPhase.addPhase();                 // one per file
+        }
+
+        for (int i = 0; i < mapTaskIds.length; i++) {
+          String partFile =
+            MapOutputFile.getInputFile(mapTaskIds[i], getTaskId()).toString();
+          SequenceFile.Reader in = new SequenceFile.Reader(lfs, partFile);
+          try {
+            while(in.next(key, value)) {
+              writer.append(key, value);
+            }
+          } finally {
+            in.close();
           }
-        } finally {
-          in.close();
+          appendPhase.startNextPhase();
         }
+
+      } finally {
+        writer.close();
       }
-    } finally {
-      writer.close();
-    }
       
-    // sort the input file
-    String sortedFile = file+".sorted";
-    WritableComparator comparator = null;
-    try {
-      comparator =
+      appendPhase.complete();                     // append is complete
+
+      // sort the input file
+      WritableComparator comparator = 
         (WritableComparator)job.newInstance(job.getOutputKeyComparatorClass());
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+    
+      SequenceFile.Sorter sorter =
+        new SequenceFile.Sorter(lfs, comparator, valueClass);
+      sorter.sort(file, sortedFile);              // sort
+      lfs.delete(new File(file));                 // remove unsorted
+
+    } finally {
+      sortComplete = true;
     }
-    SequenceFile.Sorter sorter =
-      new SequenceFile.Sorter(lfs, comparator, valueClass);
-    sorter.sort(file, sortedFile);                // sort
-    lfs.delete(new File(file));                   // remove unsorted
 
-    umbilical.progress(getTaskId(), new FloatWritable(2.0f/3.0f));
+    sortPhase.complete();                         // sort is complete
 
     // make output collector
     String name = getOutputName(getPartition());
@@ -198,8 +242,9 @@
     
     // apply reduce function
     SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile);
+    long length = lfs.getLength(new File(sortedFile));
     try {
-      ValuesIterator values = new ValuesIterator(in);
+      ValuesIterator values = new ValuesIterator(in, length, umbilical);
       while (values.more()) {
         reducer.reduce(values.getKey(), values, collector);
         values.nextKey();
@@ -211,7 +256,7 @@
       out.close();
     }
 
-    umbilical.progress(getTaskId(), new FloatWritable(3.0f/3.0f));
+    reportProgress(umbilical);
   }
 
   /** Construct output file names so that, when an output directory listing is

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java?rev=225647&r1=225646&r2=225647&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java Wed Jul 27 15:46:48 2005
@@ -38,18 +38,20 @@
     ReduceTask task = ((ReduceTask)getTask());
     MapOutputFile.removeAll(task.getTaskId());    // cleanup from failures
     String[] mapTaskIds = task.getMapTaskIds();
-
-    getTracker().progress(task.getTaskId(), new FloatWritable(0.0f/3.0f));
+    final Progress copyPhase = getTask().getTaskProgress().phase();
 
     // we need input from every map task
     HashSet needed = new HashSet();
     for (int i = 0; i < mapTaskIds.length; i++) {
       needed.add(mapTaskIds[i]);
+      copyPhase.addPhase();                       // add sub-phase per file
     }
 
     InterTrackerProtocol jobClient = getTracker().getJobClient();
     while (needed.size() > 0) {
 
+      getTask().reportProgress(getTracker());
+
       // get list of available map output locations from job tracker
       String[] neededStrings =
         (String[])needed.toArray(new String[needed.size()]);
@@ -74,24 +76,41 @@
         MapOutputProtocol client =
           (MapOutputProtocol)RPC.getProxy(MapOutputProtocol.class, addr);
 
+        MapOutputFile.setProgressReporter(new MapOutputFile.ProgressReporter(){
+            public void progress(float progress) {
+              copyPhase.phase().set(progress);
+              try {
+                getTask().reportProgress(getTracker());
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
 
+        getTask().reportProgress(getTracker());
         try {
           LOG.info("Copying "+loc.getMapTaskId()+" from "+addr);
+          
           client.getFile(loc.getMapTaskId(), task.getTaskId(),
                          new IntWritable(task.getPartition()));
           
           needed.remove(loc.getMapTaskId());     // success: remove from needed
           
           LOG.info("Copy complete: "+loc.getMapTaskId()+" from "+addr);
-
+          
+          copyPhase.startNextPhase();
+          
         } catch (IOException e) {                 // failed: try again later
           LOG.info("Copy failed: "+loc.getMapTaskId()+" from "+addr);
+          
+        } finally {
+          MapOutputFile.setProgressReporter(null);
         }
         
       }
-
+      
     }
-    getTracker().progress(task.getTaskId(), new FloatWritable(1.0f/3.0f));
+    getTask().reportProgress(getTracker());
   }
 
   /** Delete all of the temporary map output files. */

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java?rev=225647&r1=225646&r2=225647&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java Wed Jul 27 15:46:48 2005
@@ -17,6 +17,7 @@
 package org.apache.nutch.mapred;
 
 import org.apache.nutch.io.*;
+import org.apache.nutch.util.*;
 
 import java.io.*;
 import java.net.*;
@@ -24,7 +25,6 @@
 
 /** Base class for tasks. */
 public abstract class Task implements Writable {
-
   ////////////////////////////////////////////
   // Fields
   ////////////////////////////////////////////
@@ -75,5 +75,29 @@
 
   /** Return an approprate thread runner for this task. */
   public abstract TaskRunner createRunner(TaskTracker tracker);
+
+  /** The number of milliseconds between progress reports. */
+  public static final int PROGRESS_INTERVAL = 1000;
+
+  private transient Progress taskProgress = new Progress();
+  private transient long nextProgressTime =
+    System.currentTimeMillis() + PROGRESS_INTERVAL;
+
+  public Progress getTaskProgress() { return taskProgress; }
+
+  public void reportProgress(TaskUmbilicalProtocol umbilical, float progress)
+    throws IOException {
+    taskProgress.set(progress);
+    reportProgress(umbilical);
+  }
+
+  public void reportProgress(TaskUmbilicalProtocol umbilical)
+    throws IOException {
+    long now = System.currentTimeMillis();
+    if (now > nextProgressTime)  {
+      umbilical.progress(getTaskId(), new FloatWritable(taskProgress.get()));
+      nextProgressTime = now + PROGRESS_INTERVAL;
+    }
+  }
 
 }

Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java?rev=225647&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java (added)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java Wed Jul 27 15:46:48 2005
@@ -0,0 +1,87 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import java.util.ArrayList;
+
+/** Utility to assist with generation of progress reports.  Applications build
+ * a hierarchy of {@link Progress} instances, each modelling a phase of
+ * execution.  The root is constructed with {@link #Progress()}.  Nodes for
+ * sub-phases are created by calling {@link #addPhase()}.
+ */
+public class Progress {
+  private float progress;
+  private int currentPhase;
+  private ArrayList phases = new ArrayList();
+  private Progress parent;
+  private float progressPerPhase;
+
+  /** Creates a new root node. */
+  public Progress() {}
+
+  /** Adds a node to the tree. */
+  public Progress addPhase() {
+    Progress phase = new Progress();
+    phases.add(phase);
+    phase.parent = this;
+    progressPerPhase = 1.0f / (float)phases.size();
+    return phase;
+  }
+
+  /** Called during execution to move to the next phase at this level in the
+   * tree. */
+  public void startNextPhase() {
+    currentPhase++;
+  }
+
+  /** Returns the current sub-node executing. */
+  public Progress phase() {
+    return (Progress)phases.get(currentPhase);
+  }
+
+  /** Completes this node, moving the parent node to its next child. */
+  public void complete() {
+    progress = 1.0f;
+    if (parent != null) {
+      parent.startNextPhase();
+    }
+  }
+
+  /** Called during execution on a leaf node to set its progress. */
+  public void set(float progress) {
+    this.progress = progress;
+  }
+
+  /** Returns the overall progress of the root. */
+  public float get() {
+    Progress node = this;
+    while (node.parent != null) {                 // find the root
+      node = parent;
+    }
+    return node.getInternal();
+  }
+
+  /** Computes progress in this node. */
+  private float getInternal() {
+    int phaseCount = phases.size();
+    if (phaseCount != 0) {
+      return progressPerPhase*(currentPhase + phase().getInternal());
+    } else {
+      return progress;
+    }
+  }
+}