You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/07/28 00:46:54 UTC
svn commit: r225647 - in
/lucene/nutch/branches/mapred/src/java/org/apache/nutch:
mapred/MapOutputFile.java mapred/MapTask.java mapred/ReduceTask.java
mapred/ReduceTaskRunner.java mapred/Task.java util/Progress.java
Author: cutting
Date: Wed Jul 27 15:46:48 2005
New Revision: 225647
URL: http://svn.apache.org/viewcvs?rev=225647&view=rev
Log:
Report progress every second, rather than every 100th of the task, so that progress can be used as a heartbeat, to tell if task is hung.
Added:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java?rev=225647&r1=225646&r2=225647&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputFile.java Wed Jul 27 15:46:48 2005
@@ -31,6 +31,17 @@
private String reduceTaskId;
private int partition;
+ /** Permits reporting of file copy progress. */
+ public static interface ProgressReporter {
+ void progress(float progress) throws IOException;
+ }
+
+ private static final ThreadLocal REPORTERS = new ThreadLocal();
+
+ public static void setProgressReporter(ProgressReporter reporter) {
+ REPORTERS.set(reporter);
+ }
+
/** Create a local map output file name.
* @param mapTaskId a map task id
* @param partition a reduce partition
@@ -96,18 +107,24 @@
this.reduceTaskId = UTF8.readString(in);
this.partition = in.readInt();
+ ProgressReporter reporter = (ProgressReporter)REPORTERS.get();
+
// read the length-prefixed file content into a local file
File file = getInputFile(mapTaskId, reduceTaskId);
long length = in.readLong();
+ long unread = length;
file.getParentFile().mkdirs(); // make directory
OutputStream out = new FileOutputStream(file);
try {
byte[] buffer = new byte[8192];
- while (length > 0) {
- int bytesToRead = Math.min((int) length, buffer.length);
+ while (unread > 0) {
+ int bytesToRead = Math.min((int) unread, buffer.length);
in.readFully(buffer, 0, bytesToRead);
out.write(buffer, 0, bytesToRead);
- length -= bytesToRead;
+ unread -= bytesToRead;
+ if (reporter != null) {
+ reporter.progress(length-unread/(float)length);
+ }
}
} finally {
out.close();
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java?rev=225647&r1=225646&r2=225647&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java Wed Jul 27 15:46:48 2005
@@ -91,18 +91,14 @@
job.getInputFormat().getRecordReader(NutchFileSystem.get(),split,job);
RecordReader in = new RecordReader() { // wrap in progress reporter
- private float end = (float)split.getLength();
- private float lastProgress = 0.0f;
+ private float perByte = 1.0f /(float)split.getLength();
public synchronized boolean next(Writable key, Writable value)
throws IOException {
float progress = // compute progress
- (float)Math.min((rawIn.getPos()-split.getStart())/end, 1.0f);
- if ((progress - lastProgress) > 0.01f) { // 100 progress reports
- umbilical.progress(getTaskId(), new FloatWritable(progress));
- lastProgress = progress;
- }
+ (float)Math.min((rawIn.getPos()-split.getStart())*perByte, 1.0f);
+ reportProgress(umbilical, progress);
return rawIn.next(key, value);
}
@@ -120,7 +116,7 @@
((CombiningCollector)collector).flush();
}
- umbilical.progress(getTaskId(), new FloatWritable(1.0f)); // done
+ reportProgress(umbilical, 1.0f); // done
} finally {
in.close(); // close input
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=225647&r1=225646&r2=225647&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java Wed Jul 27 15:46:48 2005
@@ -31,6 +31,12 @@
private String[] mapTaskIds;
private int partition;
+ private boolean sortComplete;
+
+ private Progress copyPhase = getTaskProgress().addPhase();
+ private Progress appendPhase = getTaskProgress().addPhase();
+ private Progress sortPhase = getTaskProgress().addPhase();
+ private Progress reducePhase = getTaskProgress().addPhase();
public ReduceTask() {}
@@ -71,15 +77,21 @@
}
/** Iterates values while keys match in sorted input. */
- private static class ValuesIterator implements Iterator {
+ private class ValuesIterator implements Iterator {
private SequenceFile.Reader in; // input file
private WritableComparable key; // current key
private Writable value; // current value
private boolean hasNext; // more w/ this key
private boolean more; // more in file
+ private float progPerByte;
+ private TaskUmbilicalProtocol umbilical;
- public ValuesIterator (SequenceFile.Reader in) throws IOException {
+ public ValuesIterator (SequenceFile.Reader in, long length,
+ TaskUmbilicalProtocol umbilical)
+ throws IOException {
this.in = in;
+ this.progPerByte = 1.0f / (float)length;
+ this.umbilical = umbilical;
getNext();
}
@@ -114,6 +126,9 @@
public WritableComparable getKey() { return key; }
private void getNext() throws IOException {
+ reducePhase.set(in.getPosition()*progPerByte); // update progress
+ reportProgress(umbilical);
+
Writable lastKey = key; // save previous key
try {
key = (WritableComparable)in.getKeyClass().newInstance();
@@ -134,56 +149,85 @@
}
}
- public void run(JobConf job, TaskUmbilicalProtocol umbilical)
+ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException {
Class keyClass = job.getOutputKeyClass();
Class valueClass = job.getOutputValueClass();
Reducer reducer = (Reducer)job.newInstance(job.getReducerClass());
-
- umbilical.progress(getTaskId(), new FloatWritable(1.0f/3.0f));
-
- // open a file to collect map output
NutchFileSystem lfs = NutchFileSystem.getNamed("local");
+
+ copyPhase.complete(); // copy is already complete
+
+ // spawn a thread to give sort progress heartbeats
+ Thread sortProgress = new Thread() {
+ public void run() {
+ while (!sortComplete) {
+ try {
+ reportProgress(umbilical);
+ Thread.sleep(PROGRESS_INTERVAL);
+ } catch (InterruptedException e) {
+ continue;
+ } catch (Throwable e) {
+ return;
+ }
+ }
+ }
+ };
+ sortProgress.setName("Sort progress reporter for task "+getTaskId());
+
File taskDir = new File(LOCAL_DIR, getTaskId());
String file = new File(taskDir, "all.in").toString();
- SequenceFile.Writer writer =
- new SequenceFile.Writer(lfs, file, keyClass, valueClass);
+ String sortedFile = file+".sorted";
+
try {
- // append all input files into a single input file
- WritableComparable key = (WritableComparable)job.newInstance(keyClass);
- Writable value = (Writable)job.newInstance(valueClass);
-
- for (int i = 0; i < mapTaskIds.length; i++) {
- String partFile =
- MapOutputFile.getInputFile(mapTaskIds[i], getTaskId()).toString();
- SequenceFile.Reader in = new SequenceFile.Reader(lfs, partFile);
- try {
- while(in.next(key, value)) {
- writer.append(key, value);
+ sortProgress.start();
+
+ // open a file to collect map output
+ SequenceFile.Writer writer =
+ new SequenceFile.Writer(lfs, file, keyClass, valueClass);
+ try {
+ // append all input files into a single input file
+ WritableComparable key = (WritableComparable)job.newInstance(keyClass);
+ Writable value = (Writable)job.newInstance(valueClass);
+
+ for (int i = 0; i < mapTaskIds.length; i++) {
+ appendPhase.addPhase(); // one per file
+ }
+
+ for (int i = 0; i < mapTaskIds.length; i++) {
+ String partFile =
+ MapOutputFile.getInputFile(mapTaskIds[i], getTaskId()).toString();
+ SequenceFile.Reader in = new SequenceFile.Reader(lfs, partFile);
+ try {
+ while(in.next(key, value)) {
+ writer.append(key, value);
+ }
+ } finally {
+ in.close();
}
- } finally {
- in.close();
+ appendPhase.startNextPhase();
}
+
+ } finally {
+ writer.close();
}
- } finally {
- writer.close();
- }
- // sort the input file
- String sortedFile = file+".sorted";
- WritableComparator comparator = null;
- try {
- comparator =
+ appendPhase.complete(); // append is complete
+
+ // sort the input file
+ WritableComparator comparator =
(WritableComparator)job.newInstance(job.getOutputKeyComparatorClass());
- } catch (Exception e) {
- throw new RuntimeException(e);
+
+ SequenceFile.Sorter sorter =
+ new SequenceFile.Sorter(lfs, comparator, valueClass);
+ sorter.sort(file, sortedFile); // sort
+ lfs.delete(new File(file)); // remove unsorted
+
+ } finally {
+ sortComplete = true;
}
- SequenceFile.Sorter sorter =
- new SequenceFile.Sorter(lfs, comparator, valueClass);
- sorter.sort(file, sortedFile); // sort
- lfs.delete(new File(file)); // remove unsorted
- umbilical.progress(getTaskId(), new FloatWritable(2.0f/3.0f));
+ sortPhase.complete(); // sort is complete
// make output collector
String name = getOutputName(getPartition());
@@ -198,8 +242,9 @@
// apply reduce function
SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile);
+ long length = lfs.getLength(new File(sortedFile));
try {
- ValuesIterator values = new ValuesIterator(in);
+ ValuesIterator values = new ValuesIterator(in, length, umbilical);
while (values.more()) {
reducer.reduce(values.getKey(), values, collector);
values.nextKey();
@@ -211,7 +256,7 @@
out.close();
}
- umbilical.progress(getTaskId(), new FloatWritable(3.0f/3.0f));
+ reportProgress(umbilical);
}
/** Construct output file names so that, when an output directory listing is
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java?rev=225647&r1=225646&r2=225647&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java Wed Jul 27 15:46:48 2005
@@ -38,18 +38,20 @@
ReduceTask task = ((ReduceTask)getTask());
MapOutputFile.removeAll(task.getTaskId()); // cleanup from failures
String[] mapTaskIds = task.getMapTaskIds();
-
- getTracker().progress(task.getTaskId(), new FloatWritable(0.0f/3.0f));
+ final Progress copyPhase = getTask().getTaskProgress().phase();
// we need input from every map task
HashSet needed = new HashSet();
for (int i = 0; i < mapTaskIds.length; i++) {
needed.add(mapTaskIds[i]);
+ copyPhase.addPhase(); // add sub-phase per file
}
InterTrackerProtocol jobClient = getTracker().getJobClient();
while (needed.size() > 0) {
+ getTask().reportProgress(getTracker());
+
// get list of available map output locations from job tracker
String[] neededStrings =
(String[])needed.toArray(new String[needed.size()]);
@@ -74,24 +76,41 @@
MapOutputProtocol client =
(MapOutputProtocol)RPC.getProxy(MapOutputProtocol.class, addr);
+ MapOutputFile.setProgressReporter(new MapOutputFile.ProgressReporter(){
+ public void progress(float progress) {
+ copyPhase.phase().set(progress);
+ try {
+ getTask().reportProgress(getTracker());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ getTask().reportProgress(getTracker());
try {
LOG.info("Copying "+loc.getMapTaskId()+" from "+addr);
+
client.getFile(loc.getMapTaskId(), task.getTaskId(),
new IntWritable(task.getPartition()));
needed.remove(loc.getMapTaskId()); // success: remove from needed
LOG.info("Copy complete: "+loc.getMapTaskId()+" from "+addr);
-
+
+ copyPhase.startNextPhase();
+
} catch (IOException e) { // failed: try again later
LOG.info("Copy failed: "+loc.getMapTaskId()+" from "+addr);
+
+ } finally {
+ MapOutputFile.setProgressReporter(null);
}
}
-
+
}
- getTracker().progress(task.getTaskId(), new FloatWritable(1.0f/3.0f));
+ getTask().reportProgress(getTracker());
}
/** Delete all of the temporary map output files. */
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java?rev=225647&r1=225646&r2=225647&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java Wed Jul 27 15:46:48 2005
@@ -17,6 +17,7 @@
package org.apache.nutch.mapred;
import org.apache.nutch.io.*;
+import org.apache.nutch.util.*;
import java.io.*;
import java.net.*;
@@ -24,7 +25,6 @@
/** Base class for tasks. */
public abstract class Task implements Writable {
-
////////////////////////////////////////////
// Fields
////////////////////////////////////////////
@@ -75,5 +75,29 @@
/** Return an approprate thread runner for this task. */
public abstract TaskRunner createRunner(TaskTracker tracker);
+
+ /** The number of milliseconds between progress reports. */
+ public static final int PROGRESS_INTERVAL = 1000;
+
+ private transient Progress taskProgress = new Progress();
+ private transient long nextProgressTime =
+ System.currentTimeMillis() + PROGRESS_INTERVAL;
+
+ public Progress getTaskProgress() { return taskProgress; }
+
+ public void reportProgress(TaskUmbilicalProtocol umbilical, float progress)
+ throws IOException {
+ taskProgress.set(progress);
+ reportProgress(umbilical);
+ }
+
+ public void reportProgress(TaskUmbilicalProtocol umbilical)
+ throws IOException {
+ long now = System.currentTimeMillis();
+ if (now > nextProgressTime) {
+ umbilical.progress(getTaskId(), new FloatWritable(taskProgress.get()));
+ nextProgressTime = now + PROGRESS_INTERVAL;
+ }
+ }
}
Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java?rev=225647&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java (added)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java Wed Jul 27 15:46:48 2005
@@ -0,0 +1,87 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import java.util.ArrayList;
+
+/** Utility to assist with generation of progress reports. Applications build
+ * a hierarchy of {@link Progress} instances, each modelling a phase of
+ * execution. The root is constructed with {@link #Progress()}. Nodes for
+ * sub-phases are created by calling {@link #addPhase()}.
+ */
+public class Progress {
+ private float progress;
+ private int currentPhase;
+ private ArrayList phases = new ArrayList();
+ private Progress parent;
+ private float progressPerPhase;
+
+ /** Creates a new root node. */
+ public Progress() {}
+
+ /** Adds a node to the tree. */
+ public Progress addPhase() {
+ Progress phase = new Progress();
+ phases.add(phase);
+ phase.parent = this;
+ progressPerPhase = 1.0f / (float)phases.size();
+ return phase;
+ }
+
+ /** Called during execution to move to the next phase at this level in the
+ * tree. */
+ public void startNextPhase() {
+ currentPhase++;
+ }
+
+ /** Returns the current sub-node executing. */
+ public Progress phase() {
+ return (Progress)phases.get(currentPhase);
+ }
+
+ /** Completes this node, moving the parent node to its next child. */
+ public void complete() {
+ progress = 1.0f;
+ if (parent != null) {
+ parent.startNextPhase();
+ }
+ }
+
+ /** Called during execution on a leaf node to set its progress. */
+ public void set(float progress) {
+ this.progress = progress;
+ }
+
+ /** Returns the overall progress of the root. */
+ public float get() {
+ Progress node = this;
+ while (node.parent != null) { // find the root
+ node = parent;
+ }
+ return node.getInternal();
+ }
+
+ /** Computes progress in this node. */
+ private float getInternal() {
+ int phaseCount = phases.size();
+ if (phaseCount != 0) {
+ return progressPerPhase*(currentPhase + phase().getInternal());
+ } else {
+ return progress;
+ }
+ }
+}