You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ku...@apache.org on 2008/12/02 15:59:21 UTC

svn commit: r722483 - in /lucene/nutch/trunk: CHANGES.txt src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java

Author: kubes
Date: Tue Dec  2 06:59:21 2008
New Revision: 722483

URL: http://svn.apache.org/viewvc?rev=722483&view=rev
Log:
NUTCH-667: Input Format for working with Content in Hadoop Streaming

Added:
    lucene/nutch/trunk/src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java
Modified:
    lucene/nutch/trunk/CHANGES.txt

Modified: lucene/nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?rev=722483&r1=722482&r2=722483&view=diff
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Tue Dec  2 06:59:21 2008
@@ -291,6 +291,8 @@
 107. NUTCH-647 - Resolve URLs tool (kubes)
 
 108. NUTCH-665 - Search Load Testing Tool (kubes)
+
+109. NUTCH-667 - Input Format for working with Content in Hadoop Streaming
      
 Release 0.9 - 2007-04-02
 

Added: lucene/nutch/trunk/src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java?rev=722483&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java Tue Dec  2 06:59:21 2008
@@ -0,0 +1,94 @@
+package org.apache.nutch.segment;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * An input format that takes Nutch Content objects and converts them to text
+ * while converting newline endings to spaces.  This format is useful for working
+ * with Nutch content objects in Hadoop Streaming with other languages.
+ */
+public class ContentAsTextInputFormat
+  extends SequenceFileInputFormat<Text, Text> {
+
+  private static class ContentAsTextRecordReader
+    implements RecordReader<Text, Text> {
+
+    private final SequenceFileRecordReader<Text, Content> sequenceFileRecordReader;
+
+    private Text innerKey;
+    private Content innerValue;
+
+    public ContentAsTextRecordReader(Configuration conf, FileSplit split)
+      throws IOException {
+      sequenceFileRecordReader = new SequenceFileRecordReader<Text, Content>(
+        conf, split);
+      innerKey = (Text)sequenceFileRecordReader.createKey();
+      innerValue = (Content)sequenceFileRecordReader.createValue();
+    }
+
+    public Text createKey() {
+      return new Text();
+    }
+
+    public Text createValue() {
+      return new Text();
+    }
+
+    public synchronized boolean next(Text key, Text value)
+      throws IOException {
+      
+      // convert the content object to text
+      Text tKey = key;
+      Text tValue = value;
+      if (!sequenceFileRecordReader.next(innerKey, innerValue)) {
+        return false;
+      }
+      tKey.set(innerKey.toString());
+      String contentAsStr = new String(innerValue.getContent());
+      
+      // replace new line endings with spaces
+      contentAsStr = contentAsStr.replaceAll("\n", " ");
+      value.set(contentAsStr);
+     
+      return true;
+    }
+
+    public float getProgress()
+      throws IOException {
+      return sequenceFileRecordReader.getProgress();
+    }
+
+    public synchronized long getPos()
+      throws IOException {
+      return sequenceFileRecordReader.getPos();
+    }
+
+    public synchronized void close()
+      throws IOException {
+      sequenceFileRecordReader.close();
+    }
+  }
+
+  public ContentAsTextInputFormat() {
+    super();
+  }
+
+  public RecordReader<Text, Text> getRecordReader(InputSplit split,
+    JobConf job, Reporter reporter)
+    throws IOException {
+
+    reporter.setStatus(split.toString());
+    return new ContentAsTextRecordReader(job, (FileSplit)split);
+  }
+}