You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/07/14 23:33:24 UTC

svn commit: r219110 - in /lucene/nutch/branches/mapred/src/java/org/apache/nutch: mapred/ReduceTask.java searcher/FetchedSegments.java

Author: cutting
Date: Thu Jul 14 14:33:22 2005
New Revision: 219110

URL: http://svn.apache.org/viewcvs?rev=219110&view=rev
Log:
Make mapred output directory listings, when sorted lexicographically, correspond to the partitioning.

Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/searcher/FetchedSegments.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java?rev=219110&r1=219109&r2=219110&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTask.java Thu Jul 14 14:33:22 2005
@@ -23,6 +23,7 @@
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import java.text.*;
 
 /** A Reduce task. */
 public class ReduceTask extends Task {
@@ -185,7 +186,7 @@
     umbilical.progress(getTaskId(), new FloatWritable(2.0f/3.0f));
 
     // make output collector
-    String name = "part-" + getPartition();
+    String name = getOutputName(getPartition());
     final RecordWriter out =
       job.getOutputFormat().getRecordWriter(NutchFileSystem.get(), job, name);
     OutputCollector collector = new OutputCollector() {
@@ -211,6 +212,20 @@
     }
 
     umbilical.progress(getTaskId(), new FloatWritable(3.0f/3.0f));
+  }
+
+  /** Construct output file names so that, when an output directory listing is
+   * sorted lexicographically, positions correspond to output partitions.*/
+
+  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+
+  static {
+    NUMBER_FORMAT.setMinimumIntegerDigits(5);
+    NUMBER_FORMAT.setGroupingUsed(false);
+  }
+
+  private static synchronized String getOutputName(int partition) {
+    return "part-" + NUMBER_FORMAT.format(partition);
   }
 
 }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/searcher/FetchedSegments.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/searcher/FetchedSegments.java?rev=219110&r1=219109&r2=219110&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/searcher/FetchedSegments.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/searcher/FetchedSegments.java Thu Jul 14 14:33:22 2005
@@ -20,6 +20,7 @@
 import java.io.File;
 
 import java.util.HashMap;
+import java.util.Arrays;
 
 import org.apache.nutch.io.*;
 import org.apache.nutch.fs.*;
@@ -54,52 +55,47 @@
 
     public byte[] getContent(UTF8 url) throws IOException {
       synchronized (this) {
-        if (content == null) {
-          File[] parts = nfs.listFiles(new File(segmentDir, Content.DIR_NAME));
-          content = new MapFile.Reader[parts.length];
-          for (int i = 0; i < parts.length; i++) {
-            content[i] = new MapFile.Reader(nfs, parts[i].toString());
-          }
-        }
+        if (content == null)
+          content = getReaders(Content.DIR_NAME);
       }
-
-      Content entry = new Content();
-      content[url.hashCode()%content.length].get(url, entry);
-      return entry.getContent();
+      return ((Content)getEntry(content, url, new Content())).getContent();
     }
 
     public ParseData getParseData(UTF8 url) throws IOException {
       synchronized (this) {
-        if (parseData == null) {
-          File[] parts=nfs.listFiles(new File(segmentDir, ParseData.DIR_NAME));
-          parseData = new MapFile.Reader[parts.length];
-          for (int i = 0; i < parts.length; i++) {
-            parseData[i] = new MapFile.Reader(nfs, parts[i].toString());
-          }
-        }
+        if (content == null)
+          content = getReaders(ParseData.DIR_NAME);
       }
-      
-      ParseData entry = new ParseData();
-      parseData[url.hashCode()%parseData.length].get(url, entry);
-      return entry;
+      return (ParseData)getEntry(content, url, new ParseData());
     }
 
     public ParseText getParseText(UTF8 url) throws IOException {
       synchronized (this) {
-        if (parseText == null) {
-          File[] parts=nfs.listFiles(new File(segmentDir, ParseText.DIR_NAME));
-          parseText = new MapFile.Reader[parts.length];
-          for (int i = 0; i < parts.length; i++) {
-            parseText[i] = new MapFile.Reader(nfs, parts[i].toString());
-          }
-        }
+        if (content == null)
+          content = getReaders(ParseText.DIR_NAME);
       }
-      
-      ParseText entry = new ParseText();
-      parseText[url.hashCode()%parseText.length].get(url, entry);
-      return entry;
+      return (ParseText)getEntry(content, url, new ParseText());
     }
     
+    private MapFile.Reader[] getReaders(String subDir) throws IOException {
+      File[] names = nfs.listFiles(new File(segmentDir, subDir));
+      
+      // sort names, so that hash partitioning works
+      Arrays.sort(names);
+
+      MapFile.Reader[] parts = new MapFile.Reader[names.length];
+      for (int i = 0; i < names.length; i++) {
+        parts[i] = new MapFile.Reader(nfs, names[i].toString());
+      }
+      return parts;
+    }
+
+    // hash the url to figure out which part its in
+    private Writable getEntry(MapFile.Reader[] readers, UTF8 url,
+                              Writable entry) throws IOException {
+      return readers[url.hashCode()%readers.length].get(url, entry);
+    }
+
   }
 
   private HashMap segments = new HashMap();