You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/07/14 23:33:24 UTC
svn commit: r219110 - in
/lucene/nutch/branches/mapred/src/java/org/apache/nutch:
mapred/ReduceTask.java searcher/FetchedSegments.java
Author: cutting
Date: Thu Jul 14 14:33:22 2005
New Revision: 219110
URL: http://svn.apache.org/viewcvs?rev=219110&view=rev
Log:
Make mapred output directory listings, when sorted lexicographically, correspond to the partitioning.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/searcher/FetchedSegments.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=219110&r1=219109&r2=219110&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java Thu Jul 14 14:33:22 2005
@@ -23,6 +23,7 @@
import java.io.*;
import java.net.*;
import java.util.*;
+import java.text.*;
/** A Reduce task. */
public class ReduceTask extends Task {
@@ -185,7 +186,7 @@
umbilical.progress(getTaskId(), new FloatWritable(2.0f/3.0f));
// make output collector
- String name = "part-" + getPartition();
+ String name = getOutputName(getPartition());
final RecordWriter out =
job.getOutputFormat().getRecordWriter(NutchFileSystem.get(), job, name);
OutputCollector collector = new OutputCollector() {
@@ -211,6 +212,20 @@
}
umbilical.progress(getTaskId(), new FloatWritable(3.0f/3.0f));
+ }
+
+ /** Construct output file names so that, when an output directory listing is
+ * sorted lexicographically, positions correspond to output partitions.*/
+
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ private static synchronized String getOutputName(int partition) {
+ return "part-" + NUMBER_FORMAT.format(partition);
}
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/searcher/FetchedSegments.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/searcher/FetchedSegments.java?rev=219110&r1=219109&r2=219110&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/searcher/FetchedSegments.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/searcher/FetchedSegments.java Thu Jul 14 14:33:22 2005
@@ -20,6 +20,7 @@
import java.io.File;
import java.util.HashMap;
+import java.util.Arrays;
import org.apache.nutch.io.*;
import org.apache.nutch.fs.*;
@@ -54,52 +55,47 @@
public byte[] getContent(UTF8 url) throws IOException {
synchronized (this) {
- if (content == null) {
- File[] parts = nfs.listFiles(new File(segmentDir, Content.DIR_NAME));
- content = new MapFile.Reader[parts.length];
- for (int i = 0; i < parts.length; i++) {
- content[i] = new MapFile.Reader(nfs, parts[i].toString());
- }
- }
+ if (content == null)
+ content = getReaders(Content.DIR_NAME);
}
-
- Content entry = new Content();
- content[url.hashCode()%content.length].get(url, entry);
- return entry.getContent();
+ return ((Content)getEntry(content, url, new Content())).getContent();
}
public ParseData getParseData(UTF8 url) throws IOException {
synchronized (this) {
- if (parseData == null) {
- File[] parts=nfs.listFiles(new File(segmentDir, ParseData.DIR_NAME));
- parseData = new MapFile.Reader[parts.length];
- for (int i = 0; i < parts.length; i++) {
- parseData[i] = new MapFile.Reader(nfs, parts[i].toString());
- }
- }
+ if (content == null)
+ content = getReaders(ParseData.DIR_NAME);
}
-
- ParseData entry = new ParseData();
- parseData[url.hashCode()%parseData.length].get(url, entry);
- return entry;
+ return (ParseData)getEntry(content, url, new ParseData());
}
public ParseText getParseText(UTF8 url) throws IOException {
synchronized (this) {
- if (parseText == null) {
- File[] parts=nfs.listFiles(new File(segmentDir, ParseText.DIR_NAME));
- parseText = new MapFile.Reader[parts.length];
- for (int i = 0; i < parts.length; i++) {
- parseText[i] = new MapFile.Reader(nfs, parts[i].toString());
- }
- }
+ if (content == null)
+ content = getReaders(ParseText.DIR_NAME);
}
-
- ParseText entry = new ParseText();
- parseText[url.hashCode()%parseText.length].get(url, entry);
- return entry;
+ return (ParseText)getEntry(content, url, new ParseText());
}
+ private MapFile.Reader[] getReaders(String subDir) throws IOException {
+ File[] names = nfs.listFiles(new File(segmentDir, subDir));
+
+ // sort names, so that hash partitioning works
+ Arrays.sort(names);
+
+ MapFile.Reader[] parts = new MapFile.Reader[names.length];
+ for (int i = 0; i < names.length; i++) {
+ parts[i] = new MapFile.Reader(nfs, names[i].toString());
+ }
+ return parts;
+ }
+
+ // hash the url to figure out which part its in
+ private Writable getEntry(MapFile.Reader[] readers, UTF8 url,
+ Writable entry) throws IOException {
+ return readers[url.hashCode()%readers.length].get(url, entry);
+ }
+
}
private HashMap segments = new HashMap();