You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/07/30 00:41:33 UTC
svn commit: r226416 - in /lucene/nutch/branches/mapred/src:
java/org/apache/nutch/crawl/ java/org/apache/nutch/io/
java/org/apache/nutch/mapred/ java/org/apache/nutch/mapred/lib/
java/org/apache/nutch/util/ test/org/apache/nutch/fs/
Author: cutting
Date: Fri Jul 29 15:41:07 2005
New Revision: 226416
URL: http://svn.apache.org/viewcvs?rev=226416&view=rev
Log:
Pass a Reporter to Mapper & Reducer so that applications can set status string, plus a few other fixes.
Added:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reporter.java
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/LinkDb.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputLocation.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityMapper.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityReducer.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/InverseMapper.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/LongSumReducer.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/RegexMapper.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java
lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java Fri Jul 29 15:41:07 2005
@@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.nutch.io.*;
+import org.apache.nutch.util.*;
import org.apache.nutch.mapred.*;
/** Merge new page entries with existing entries. */
@@ -32,7 +33,8 @@
}
public void reduce(WritableComparable key, Iterator values,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
CrawlDatum highest = null;
CrawlDatum old = null;
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java Fri Jul 29 15:41:07 2005
@@ -53,6 +53,7 @@
private RecordReader input;
private OutputCollector output;
+ private Reporter reporter;
private int activeThreads;
private int maxRedirect;
@@ -193,21 +194,21 @@
public Fetcher(NutchConf conf) { super(conf); }
- private synchronized void updateStatus(int bytesInPage) {
+ private synchronized void updateStatus(int bytesInPage) throws IOException {
pages++;
bytes += bytesInPage;
if ((pages % 100) == 0) { // show status every 100pp
long elapsed = (System.currentTimeMillis() - start)/1000;
- LOG.info( "status: "
- + pages + " pages, "
- + errors + " errors, "
- + bytes + " bytes, "
- + elapsed + " seconds");
- LOG.info("status: "
- + ((float)pages)/elapsed+" pages/s, "
- + ((((float)bytes)*8)/1024)/elapsed+" kb/s, "
- + ((float)bytes)/pages+" bytes/page");
+ String line1 =
+ pages+" pages, "+errors+" errors, "+bytes+" bytes, "+elapsed+" secs";
+ String line2 =
+ + ((float)pages)/elapsed+" pages/s, "
+ + ((((float)bytes)*8)/1024)/elapsed+" kb/s, "
+ + ((float)bytes)/pages+" bytes/page";
+ LOG.info( "status: "+line1);
+ LOG.info( "status: "+line2);
+ reporter.setStatus(line2);
}
}
@@ -218,11 +219,12 @@
}
}
- public void run(RecordReader input, OutputCollector output)
- throws IOException {
+ public void run(RecordReader input, OutputCollector output,
+ Reporter reporter) throws IOException {
this.input = input;
this.output = output;
+ this.reporter = reporter;
this.maxRedirect = getConf().getInt("http.redirect.max", 3);
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java Fri Jul 29 15:41:07 2005
@@ -50,7 +50,8 @@
/** Select & invert subset due for fetch. */
public void map(WritableComparable key, Writable value,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
CrawlDatum crawlDatum = (CrawlDatum)value;
if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_GONE)
@@ -82,7 +83,9 @@
/** Collect until limit is reached. */
public void reduce(WritableComparable key, Iterator values,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
while (values.hasNext() && ++count < limit) {
output.collect(key, (Writable)values.next());
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java Fri Jul 29 15:41:07 2005
@@ -124,7 +124,8 @@
}
public void reduce(WritableComparable key, Iterator values,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
Inlinks inlinks = null;
CrawlDatum crawlDatum = null;
ParseData parseData = null;
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Injector.java Fri Jul 29 15:41:07 2005
@@ -43,7 +43,8 @@
}
public void map(WritableComparable key, Writable val,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
UTF8 value = (UTF8)val;
String url = value.toString(); // value is line of text
try {
@@ -66,7 +67,8 @@
public void configure(JobConf job) {}
public void reduce(WritableComparable key, Iterator values,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
output.collect(key, (Writable)values.next()); // just collect first value
}
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/LinkDb.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/LinkDb.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/LinkDb.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/LinkDb.java Fri Jul 29 15:41:07 2005
@@ -51,7 +51,8 @@
}
public void map(WritableComparable key, Writable value,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
String fromUrl = key.toString();
ParseData parseData = (ParseData)value;
Outlink[] outlinks = parseData.getOutlinks();
@@ -69,7 +70,8 @@
}
public void reduce(WritableComparable key, Iterator values,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
Inlinks result = null;
while (values.hasNext()) {
Inlinks inlinks = (Inlinks)values.next();
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java Fri Jul 29 15:41:07 2005
@@ -54,7 +54,8 @@
}
public void map(WritableComparable key, Writable value,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
Content content = (Content)value;
Parse parse = null;
@@ -77,7 +78,8 @@
}
public void reduce(WritableComparable key, Iterator values,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
output.collect(key, (Writable)values.next()); // collect first value
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Fri Jul 29 15:41:07 2005
@@ -76,9 +76,6 @@
throws IOException {
this.nfs = nfs;
this.target = new File(name);
- if (nfs.exists(target)) {
- throw new IOException("already exists: " + target);
- }
init(new NFSDataOutputStream(nfs.create(target)),
keyClass, valClass);
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/CombiningCollector.java Fri Jul 29 15:41:07 2005
@@ -21,7 +21,7 @@
import java.util.*;
import org.apache.nutch.io.*;
-import org.apache.nutch.util.NutchConf;
+import org.apache.nutch.util.*;
/** Implements partial value reduction during mapping. This can minimize the
* size of intermediate data. Buffers a list of values for each unique key,
@@ -37,10 +37,13 @@
private JobConf job;
private OutputCollector out;
private Reducer combiner;
+ private Reporter reporter;
- public CombiningCollector(JobConf job, OutputCollector out) {
+ public CombiningCollector(JobConf job, OutputCollector out,
+ Reporter reporter) {
this.job = job;
this.out = out;
+ this.reporter = reporter;
this.combiner = (Reducer)job.newInstance(job.getCombinerClass());
this.keyToValues = new TreeMap
((Comparator)job.newInstance(job.getOutputKeyComparatorClass()));
@@ -72,7 +75,7 @@
Map.Entry pair = (Map.Entry)pairs.next();
combiner.reduce((WritableComparable)pair.getKey(),
((ArrayList)pair.getValue()).iterator(),
- out);
+ out, reporter);
}
keyToValues.clear();
count = 0;
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputLocation.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputLocation.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapOutputLocation.java Fri Jul 29 15:41:07 2005
@@ -61,4 +61,8 @@
this.port = in.readInt();
}
+ public String toString() {
+ return mapTaskId+"@"+host+":"+port;
+ }
+
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunnable.java Fri Jul 29 15:41:07 2005
@@ -28,6 +28,6 @@
* @param input the {@link RecordReader} with input key/value pairs.
* @param output the {@link OutputCollector} for mapped key/value pairs.
*/
- void run(RecordReader input, OutputCollector output)
+ void run(RecordReader input, OutputCollector output, Reporter reporter)
throws IOException;
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapRunner.java Fri Jul 29 15:41:07 2005
@@ -35,7 +35,8 @@
this.inputValueClass = job.getInputValueClass();
}
- public void run(RecordReader input, OutputCollector output)
+ public void run(RecordReader input, OutputCollector output,
+ Reporter reporter)
throws IOException {
while (true) {
// allocate new key & value instances
@@ -48,7 +49,7 @@
return;
// map pair to output
- mapper.map(key, value, output);
+ mapper.map(key, value, output, reporter);
}
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/MapTask.java Fri Jul 29 15:41:07 2005
@@ -81,10 +81,11 @@
};
OutputCollector collector = partCollector;
+ Reporter reporter = getReporter(umbilical, getProgress());
boolean combining = job.getCombinerClass() != null;
if (combining) { // add combining collector
- collector = new CombiningCollector(job, partCollector);
+ collector = new CombiningCollector(job, partCollector, reporter);
}
final RecordReader rawIn = // open input
@@ -110,7 +111,7 @@
(MapRunnable)job.newInstance(job.getMapRunnerClass());
try {
- runner.run(in, collector); // run the map
+ runner.run(in, collector, reporter); // run the map
if (combining) { // flush combiner
((CombiningCollector)collector).flush();
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Mapper.java Fri Jul 29 15:41:07 2005
@@ -36,6 +36,7 @@
* @param value the values
* @param output collects mapped keys and values
*/
- void map(WritableComparable key, Writable value, OutputCollector output)
+ void map(WritableComparable key, Writable value,
+ OutputCollector output, Reporter reporter)
throws IOException;
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java Fri Jul 29 15:41:07 2005
@@ -33,10 +33,12 @@
private int partition;
private boolean sortComplete;
- private Progress copyPhase = getTaskProgress().addPhase();
- private Progress appendPhase = getTaskProgress().addPhase();
- private Progress sortPhase = getTaskProgress().addPhase();
- private Progress reducePhase = getTaskProgress().addPhase();
+ { getProgress().setStatus("reduce"); }
+
+ private Progress copyPhase = getProgress().addPhase("copy");
+ private Progress appendPhase = getProgress().addPhase("append");
+ private Progress sortPhase = getProgress().addPhase("sort");
+ private Progress reducePhase = getProgress().addPhase("reduce");
public ReduceTask() {}
@@ -165,25 +167,27 @@
new SequenceFile.Writer(lfs, file, keyClass, valueClass);
try {
// append all input files into a single input file
- WritableComparable key = (WritableComparable)job.newInstance(keyClass);
- Writable value = (Writable)job.newInstance(valueClass);
-
for (int i = 0; i < mapTaskIds.length; i++) {
appendPhase.addPhase(); // one per file
}
+ DataOutputBuffer buffer = new DataOutputBuffer();
+
for (int i = 0; i < mapTaskIds.length; i++) {
File partFile =
MapOutputFile.getInputFile(mapTaskIds[i], getTaskId());
float progPerByte = 1.0f / lfs.getLength(partFile);
Progress phase = appendPhase.phase();
+ phase.setStatus(partFile.toString());
SequenceFile.Reader in =
new SequenceFile.Reader(lfs, partFile.toString());
try {
- while(in.next(key, value)) {
- writer.append(key, value);
+ int keyLen;
+ while((keyLen = in.next(buffer)) > 0) {
+ writer.append(buffer.getData(), 0, buffer.getLength(), keyLen);
phase.set(in.getPosition()*progPerByte);
reportProgress(umbilical);
+ buffer.reset();
}
} finally {
in.close();
@@ -247,11 +251,12 @@
// apply reduce function
SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile);
+ Reporter reporter = getReporter(umbilical, getProgress());
long length = lfs.getLength(new File(sortedFile));
try {
ValuesIterator values = new ValuesIterator(in, length, umbilical);
while (values.more()) {
- reducer.reduce(values.getKey(), values, collector);
+ reducer.reduce(values.getKey(), values, collector, reporter);
values.nextKey();
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java Fri Jul 29 15:41:07 2005
@@ -38,7 +38,7 @@
ReduceTask task = ((ReduceTask)getTask());
MapOutputFile.removeAll(task.getTaskId()); // cleanup from failures
String[] mapTaskIds = task.getMapTaskIds();
- final Progress copyPhase = getTask().getTaskProgress().phase();
+ final Progress copyPhase = getTask().getProgress().phase();
// we need input from every map task
HashSet needed = new HashSet();
@@ -89,19 +89,17 @@
getTask().reportProgress(getTracker());
try {
- LOG.info("Copying "+loc.getMapTaskId()+" from "+addr);
+ copyPhase.phase().setStatus(loc.toString());
client.getFile(loc.getMapTaskId(), task.getTaskId(),
new IntWritable(task.getPartition()));
needed.remove(loc.getMapTaskId()); // success: remove from needed
- LOG.info("Copy complete: "+loc.getMapTaskId()+" from "+addr);
-
copyPhase.startNextPhase();
} catch (IOException e) { // failed: try again later
- LOG.info("Copy failed: "+loc.getMapTaskId()+" from "+addr);
+ LOG.warning("copy failed: "+loc.getMapTaskId()+" from "+addr);
} finally {
MapOutputFile.setProgressReporter(null);
@@ -115,7 +113,7 @@
/** Delete all of the temporary map output files. */
public void close() throws IOException {
- LOG.info("Task "+getTask()+" done; removing files.");
+ getTask().getProgress().setStatus("closed");
MapOutputFile.removeAll(getTask().getTaskId());
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reducer.java Fri Jul 29 15:41:07 2005
@@ -35,6 +35,7 @@
* @param values the values to combine
* @param output to collect combined values
*/
- void reduce(WritableComparable key, Iterator values, OutputCollector output)
+ void reduce(WritableComparable key, Iterator values,
+ OutputCollector output, Reporter reporter)
throws IOException;
}
Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reporter.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reporter.java?rev=226416&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reporter.java (added)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Reporter.java Fri Jul 29 15:41:07 2005
@@ -0,0 +1,28 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.mapred;
+
+import java.io.IOException;
+
+/** Passed to application code to permit alteration of status. */
+public interface Reporter {
+ /** Alter the application's status description.
+ *
+ * @param status a brief description of the current status
+ */
+ void setStatus(String status) throws IOException;
+}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/Task.java Fri Jul 29 15:41:07 2005
@@ -83,7 +83,17 @@
private transient long nextProgressTime =
System.currentTimeMillis() + PROGRESS_INTERVAL;
- public Progress getTaskProgress() { return taskProgress; }
+ public Progress getProgress() { return taskProgress; }
+
+ public Reporter getReporter(final TaskUmbilicalProtocol umbilical,
+ final Progress progress) throws IOException {
+ return new Reporter() {
+ public void setStatus(String status) throws IOException {
+ progress.setStatus(status);
+ reportProgress(umbilical);
+ }
+ };
+ }
public void reportProgress(TaskUmbilicalProtocol umbilical, float progress)
throws IOException {
@@ -97,7 +107,9 @@
if (now > nextProgressTime) {
synchronized (this) {
nextProgressTime = now + PROGRESS_INTERVAL;
- umbilical.progress(getTaskId(), taskProgress.get(), "");
+ float progress = taskProgress.get();
+ String status = taskProgress.toString();
+ umbilical.progress(getTaskId(), progress, status);
}
}
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Fri Jul 29 15:41:07 2005
@@ -355,8 +355,8 @@
/**
* The task is reporting its progress
*/
- public synchronized void reportProgress(float p) {
- LOG.info("Progress for task " + task.getTaskId() + " is " + p);
+ public synchronized void reportProgress(float p, String state) {
+ LOG.info(task.getTaskId()+" "+p+"% "+state);
this.progress = p;
this.runstate = TaskStatus.RUNNING;
}
@@ -469,7 +469,7 @@
*/
public void progress(String taskid, float progress, String state) throws IOException {
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
- tip.reportProgress(progress);
+ tip.reportProgress(progress, state);
}
/**
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityMapper.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityMapper.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityMapper.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityMapper.java Fri Jul 29 15:41:07 2005
@@ -21,6 +21,7 @@
import org.apache.nutch.mapred.Mapper;
import org.apache.nutch.mapred.OutputCollector;
import org.apache.nutch.mapred.JobConf;
+import org.apache.nutch.mapred.Reporter;
import org.apache.nutch.io.Writable;
import org.apache.nutch.io.WritableComparable;
@@ -33,7 +34,8 @@
/** The identify function. Input key/value pair is written directly to
* output.*/
public void map(WritableComparable key, Writable val,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
output.collect(key, val);
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityReducer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityReducer.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityReducer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/IdentityReducer.java Fri Jul 29 15:41:07 2005
@@ -23,6 +23,7 @@
import org.apache.nutch.mapred.Reducer;
import org.apache.nutch.mapred.OutputCollector;
import org.apache.nutch.mapred.JobConf;
+import org.apache.nutch.mapred.Reporter;
import org.apache.nutch.io.Writable;
import org.apache.nutch.io.WritableComparable;
@@ -33,8 +34,9 @@
public void configure(JobConf job) {}
/** Writes all keys and values directly to output. */
- public void reduce (WritableComparable key, Iterator values,
- OutputCollector output) throws IOException {
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
while (values.hasNext()) {
output.collect(key, (Writable)values.next());
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/InverseMapper.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/InverseMapper.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/InverseMapper.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/InverseMapper.java Fri Jul 29 15:41:07 2005
@@ -21,10 +21,12 @@
import org.apache.nutch.mapred.Mapper;
import org.apache.nutch.mapred.OutputCollector;
import org.apache.nutch.mapred.JobConf;
+import org.apache.nutch.mapred.Reporter;
import org.apache.nutch.io.WritableComparable;
import org.apache.nutch.io.Writable;
+
/** A {@link Mapper} that swaps keys and values. */
public class InverseMapper implements Mapper {
@@ -32,7 +34,8 @@
/** The inverse function. Input keys and values are swapped.*/
public void map(WritableComparable key, Writable value,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
output.collect((WritableComparable)value, key);
}
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/LongSumReducer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/LongSumReducer.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/LongSumReducer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/LongSumReducer.java Fri Jul 29 15:41:07 2005
@@ -22,19 +22,21 @@
import org.apache.nutch.mapred.Reducer;
import org.apache.nutch.mapred.OutputCollector;
import org.apache.nutch.mapred.JobConf;
+import org.apache.nutch.mapred.Reporter;
+
import org.apache.nutch.io.Writable;
import org.apache.nutch.io.WritableComparable;
import org.apache.nutch.io.LongWritable;
-
/** A {@link Reducer} that sums long values. */
public class LongSumReducer implements Reducer {
public void configure(JobConf job) {}
public void reduce(WritableComparable key, Iterator values,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
// sum all values for this key
long sum = 0;
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/RegexMapper.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/RegexMapper.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/RegexMapper.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/RegexMapper.java Fri Jul 29 15:41:07 2005
@@ -21,12 +21,14 @@
import org.apache.nutch.mapred.Mapper;
import org.apache.nutch.mapred.OutputCollector;
import org.apache.nutch.mapred.JobConf;
+import org.apache.nutch.mapred.Reporter;
import org.apache.nutch.io.WritableComparable;
import org.apache.nutch.io.Writable;
import org.apache.nutch.io.LongWritable;
import org.apache.nutch.io.UTF8;
+
import java.util.regex.Pattern;
import java.util.regex.Matcher;
@@ -43,7 +45,8 @@
}
public void map(WritableComparable key, Writable value,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
String text = ((UTF8)value).toString();
Matcher matcher = pattern.matcher(text);
while (matcher.find()) {
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java Fri Jul 29 15:41:07 2005
@@ -22,12 +22,14 @@
import org.apache.nutch.mapred.Mapper;
import org.apache.nutch.mapred.OutputCollector;
import org.apache.nutch.mapred.JobConf;
+import org.apache.nutch.mapred.Reporter;
import org.apache.nutch.io.WritableComparable;
import org.apache.nutch.io.Writable;
import org.apache.nutch.io.LongWritable;
import org.apache.nutch.io.UTF8;
+
/** A {@link Mapper} that maps text values into <token,freq> pairs. Uses
* {@link StringTokenizer} to break text into tokens. */
public class TokenCountMapper implements Mapper {
@@ -35,7 +37,8 @@
public void configure(JobConf job) {}
public void map(WritableComparable key, Writable value,
- OutputCollector output) throws IOException {
+ OutputCollector output, Reporter reporter)
+ throws IOException {
// get input text
long position = ((LongWritable)key).get(); // key is position in file
String text = ((UTF8)value).toString(); // value is line of text
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/util/Progress.java Fri Jul 29 15:41:07 2005
@@ -24,6 +24,7 @@
* sub-phases are created by calling {@link #addPhase()}.
*/
public class Progress {
+ private String status = "";
private float progress;
private int currentPhase;
private ArrayList phases = new ArrayList();
@@ -33,6 +34,13 @@
/** Creates a new root node. */
public Progress() {}
+ /** Adds a named node to the tree. */
+ public Progress addPhase(String status) {
+ Progress phase = addPhase();
+ phase.setStatus(status);
+ return phase;
+ }
+
/** Adds a node to the tree. */
public Progress addPhase() {
Progress phase = new Progress();
@@ -79,9 +87,30 @@
private float getInternal() {
int phaseCount = phases.size();
if (phaseCount != 0) {
- return progressPerPhase*(currentPhase + phase().getInternal());
+ float subProgress =
+ currentPhase < phaseCount ? phase().getInternal() : 0.0f;
+ return progressPerPhase*(currentPhase + subProgress);
} else {
return progress;
}
}
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String toString() {
+ StringBuffer result = new StringBuffer();
+ toString(result);
+ return result.toString();
+ }
+
+ private void toString(StringBuffer buffer) {
+ buffer.append(status);
+ if (phases.size() != 0 && currentPhase < phases.size()) {
+ buffer.append(" > ");
+ phase().toString(buffer);
+ }
+ }
+
}
Modified: lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java?rev=226416&r1=226415&r2=226416&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java (original)
+++ lucene/nutch/branches/mapred/src/test/org/apache/nutch/fs/TestNutchFileSystem.java Fri Jul 29 15:41:07 2005
@@ -116,13 +116,14 @@
}
public void map(WritableComparable key, Writable value,
- OutputCollector collector) throws IOException {
+ OutputCollector collector, Reporter reporter)
+ throws IOException {
String name = ((UTF8)key).toString();
long size = ((LongWritable)value).get();
long seed = Long.parseLong(name);
random.setSeed(seed);
- //LOG.info("writing: name="+name+" size="+size);
+ reporter.setStatus("creating " + name);
OutputStream out = fs.create(new File(DATA_DIR, name));
@@ -134,12 +135,15 @@
int length = (remains<=buffer.length) ? (int)remains : buffer.length;
out.write(buffer, 0, length);
written += length;
+ reporter.setStatus("writing "+name+"@"+written+"/"+size);
}
} finally {
out.close();
}
collector.collect(new UTF8("bytes"), new LongWritable(written));
+
+ reporter.setStatus("wrote " + name);
}
}
@@ -188,36 +192,41 @@
}
public void map(WritableComparable key, Writable value,
- OutputCollector collector) throws IOException {
+ OutputCollector collector, Reporter reporter)
+ throws IOException {
String name = ((UTF8)key).toString();
long size = ((LongWritable)value).get();
long seed = Long.parseLong(name);
random.setSeed(seed);
- //LOG.info("reading: name="+name+" size="+size);
+ reporter.setStatus("opening " + name);
- InputStream in = fs.open(new File(DATA_DIR, name));
+ DataInputStream in =
+ new DataInputStream(fs.open(new File(DATA_DIR, name)));
long read = 0;
try {
while (read < size) {
long remains = size - read;
- int req = (remains<=buffer.length) ? (int)remains : buffer.length;
- int got = in.read(buffer, 0, req);
- read += got;
- assertEquals(got, req);
+ int n = (remains<=buffer.length) ? (int)remains : buffer.length;
+ in.readFully(buffer, 0, n);
+ read += n;
random.nextBytes(check);
- if (got != buffer.length) {
- Arrays.fill(buffer, got, buffer.length, (byte)0);
- Arrays.fill(check, got, check.length, (byte)0);
+ if (n != buffer.length) {
+ Arrays.fill(buffer, n, buffer.length, (byte)0);
+ Arrays.fill(check, n, check.length, (byte)0);
}
assertTrue(Arrays.equals(buffer, check));
+ reporter.setStatus("reading "+name+"@"+read+"/"+size);
+
}
} finally {
in.close();
}
collector.collect(new UTF8("bytes"), new LongWritable(read));
+
+ reporter.setStatus("read " + name);
}
}