You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by si...@apache.org on 2007/01/25 19:12:00 UTC
svn commit: r499878 - in /lucene/nutch/trunk: CHANGES.txt
src/java/org/apache/nutch/indexer/Indexer.java
src/java/org/apache/nutch/segment/SegmentMerger.java
src/java/org/apache/nutch/segment/SegmentReader.java
Author: siren
Date: Thu Jan 25 10:11:59 2007
New Revision: 499878
URL: http://svn.apache.org/viewvc?view=rev&rev=499878
Log:
NUTCH-433
Modified:
lucene/nutch/trunk/CHANGES.txt
lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java
lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java
Modified: lucene/nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?view=diff&rev=499878&r1=499877&r2=499878
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Thu Jan 25 10:11:59 2007
@@ -139,6 +139,9 @@
45. NUTCH-68 - Add a tool to generate arbitrary fetchlists. (ab)
+46. NUTCH-433 - java.io.EOFException in newer nightlies in mergesegs
+ or indexing from hadoop.io.DataOutputBuffer (siren)
+
Release 0.8 - 2006-07-25
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java?view=diff&rev=499878&r1=499877&r2=499878
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java Thu Jan 25 10:11:59 2007
@@ -24,7 +24,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.*;
-import org.apache.nutch.fetcher.Fetcher;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
@@ -51,41 +50,12 @@
import org.apache.nutch.metadata.Nutch;
/** Create indexes for segments. */
-public class Indexer extends ToolBase implements Reducer {
+public class Indexer extends ToolBase implements Reducer, Mapper {
public static final String DONE_NAME = "index.done";
public static final Log LOG = LogFactory.getLog(Indexer.class);
- /** Wraps inputs in an {@link ObjectWritable}, to permit merging different
- * types in reduce. */
- public static class InputFormat extends SequenceFileInputFormat {
- public RecordReader getRecordReader(FileSystem fs, FileSplit split,
- JobConf job, Reporter reporter)
- throws IOException {
-
- reporter.setStatus(split.toString());
-
- return new SequenceFileRecordReader(job, split) {
- public synchronized boolean next(Writable key, Writable value)
- throws IOException {
- ObjectWritable wrapper = (ObjectWritable)value;
- try {
- wrapper.set(getValueClass().newInstance());
- } catch (Exception e) {
- throw new IOException(e.toString());
- }
- return super.next(key, (Writable)wrapper.get());
- }
-
- // override the default - we want ObjectWritable-s here
- public Writable createValue() {
- return new ObjectWritable();
- }
- };
- }
- }
-
/** Unwrap Lucene Documents created by reduce and add them to an index. */
public static class OutputFormat
extends org.apache.hadoop.mapred.OutputFormatBase {
@@ -290,12 +260,9 @@
job.addInputPath(new Path(crawlDb, CrawlDb.CURRENT_NAME));
job.addInputPath(new Path(linkDb, LinkDb.CURRENT_NAME));
+ job.setInputFormat(SequenceFileInputFormat.class);
- job.setInputFormat(InputFormat.class);
- //job.setInputKeyClass(Text.class);
- //job.setInputValueClass(ObjectWritable.class);
-
- //job.setCombinerClass(Indexer.class);
+ job.setMapperClass(Indexer.class);
job.setReducerClass(Indexer.class);
job.setOutputPath(indexDir);
@@ -332,6 +299,11 @@
LOG.fatal("Indexer: " + StringUtils.stringifyException(e));
return -1;
}
+ }
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector output, Reporter reporter) throws IOException {
+ output.collect(key, new ObjectWritable(value));
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?view=diff&rev=499878&r1=499877&r2=499878
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Thu Jan 25 10:11:59 2007
@@ -32,9 +32,7 @@
import org.apache.hadoop.util.Progressable;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.Generator;
-import org.apache.nutch.fetcher.Fetcher;
import org.apache.nutch.metadata.MetaWrapper;
-import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.parse.ParseData;
@@ -104,37 +102,61 @@
* types in reduce and use additional metadata.
*/
public static class ObjectInputFormat extends SequenceFileInputFormat {
- public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter)
- throws IOException {
-
- reporter.setStatus(split.toString());
+
+ @Override
+ public RecordReader getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter) {
+
+ try{
+ reporter.setStatus(split.toString());
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot set status for reported:", e);
+ }
// find part name
- final SegmentPart segmentPart = SegmentPart.get(split);
- final String spString = segmentPart.toString();
-
- return new SequenceFileRecordReader(job, split) {
- public synchronized boolean next(Writable key, Writable value) throws IOException {
- MetaWrapper wrapper = (MetaWrapper) value;
- try {
- wrapper.set(getValueClass().newInstance());
- } catch (Exception e) {
- throw new IOException(e.toString());
+ SegmentPart segmentPart;
+ final String spString;
+ try {
+ segmentPart = SegmentPart.get((FileSplit) split);
+ spString = segmentPart.toString();
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot identify segment:", e);
+ }
+
+ try {
+ return new SequenceFileRecordReader(job, (FileSplit)split) {
+
+ @Override
+ public synchronized boolean next(Writable key, Writable value) throws IOException {
+ LOG.debug("Running OIF.next()");
+
+ MetaWrapper wrapper = (MetaWrapper) value;
+ try {
+ wrapper.set(getValueClass().newInstance());
+ } catch (Exception e) {
+ throw new IOException(e.toString());
+ }
+
+ boolean res = super.next(key, (Writable) wrapper.get());
+ wrapper.setMeta(SEGMENT_PART_KEY, spString);
+ return res;
+ }
+
+ @Override
+ public Writable createValue() {
+ return new MetaWrapper();
}
- boolean res = super.next(key, (Writable) wrapper.get());
- wrapper.setMeta(SEGMENT_PART_KEY, spString);
- return res;
- }
-
- public Writable createValue() {
- return new MetaWrapper();
- }
- };
+
+ };
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot create RecordReader: ", e);
+ }
}
}
public static class SegmentOutputFormat extends OutputFormatBase {
private static final String DEFAULT_SLICE = "default";
+ @Override
public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable progress) throws IOException {
return new RecordWriter() {
MapFile.Writer c_out = null;
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java?view=diff&rev=499878&r1=499877&r2=499878
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java Thu Jan 25 10:11:59 2007
@@ -47,33 +47,6 @@
private boolean co, fe, ge, pa, pd, pt;
private FileSystem fs;
- /**
- * Wraps inputs in an {@link ObjectWritable}, to permit merging different
- * types in reduce.
- */
- public static class InputFormat extends SequenceFileInputFormat {
- public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter)
- throws IOException {
- reporter.setStatus(split.toString());
-
- return new SequenceFileRecordReader(job, split) {
- public synchronized boolean next(Writable key, Writable value) throws IOException {
- ObjectWritable wrapper = (ObjectWritable) value;
- try {
- wrapper.set(getValueClass().newInstance());
- } catch (Exception e) {
- throw new IOException(e.toString());
- }
- return super.next(key, (Writable) wrapper.get());
- }
-
- public Writable createValue() {
- return new ObjectWritable();
- }
- };
- }
- }
-
public static class InputCompatMapper extends MapReduceBase implements Mapper {
private Text newKey = new Text();
@@ -83,7 +56,7 @@
newKey.set(key.toString());
key = newKey;
}
- collector.collect(key, value);
+ collector.collect(key, new ObjectWritable(value));
}
}
@@ -198,7 +171,7 @@
if (pd) job.addInputPath(new Path(segment, ParseData.DIR_NAME));
if (pt) job.addInputPath(new Path(segment, ParseText.DIR_NAME));
- job.setInputFormat(InputFormat.class);
+ job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(InputCompatMapper.class);
job.setReducerClass(SegmentReader.class);