You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by si...@apache.org on 2007/01/25 19:12:00 UTC

svn commit: r499878 - in /lucene/nutch/trunk: CHANGES.txt src/java/org/apache/nutch/indexer/Indexer.java src/java/org/apache/nutch/segment/SegmentMerger.java src/java/org/apache/nutch/segment/SegmentReader.java

Author: siren
Date: Thu Jan 25 10:11:59 2007
New Revision: 499878

URL: http://svn.apache.org/viewvc?view=rev&rev=499878
Log:
NUTCH-433

Modified:
    lucene/nutch/trunk/CHANGES.txt
    lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java
    lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
    lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java

Modified: lucene/nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?view=diff&rev=499878&r1=499877&r2=499878
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Thu Jan 25 10:11:59 2007
@@ -139,6 +139,9 @@
 
 45. NUTCH-68 - Add a tool to generate arbitrary fetchlists. (ab)
 
+46. NUTCH-433 - java.io.EOFException in newer nightlies in mergesegs
+    or indexing from hadoop.io.DataOutputBuffer (siren)
+
 
 Release 0.8 - 2006-07-25
 

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java?view=diff&rev=499878&r1=499877&r2=499878
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java Thu Jan 25 10:11:59 2007
@@ -24,7 +24,6 @@
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.io.*;
-import org.apache.nutch.fetcher.Fetcher;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
@@ -51,41 +50,12 @@
 import org.apache.nutch.metadata.Nutch;
 
 /** Create indexes for segments. */
-public class Indexer extends ToolBase implements Reducer {
+public class Indexer extends ToolBase implements Reducer, Mapper {
   
   public static final String DONE_NAME = "index.done";
 
   public static final Log LOG = LogFactory.getLog(Indexer.class);
 
-  /** Wraps inputs in an {@link ObjectWritable}, to permit merging different
-   * types in reduce. */
-  public static class InputFormat extends SequenceFileInputFormat {
-    public RecordReader getRecordReader(FileSystem fs, FileSplit split,
-                                        JobConf job, Reporter reporter)
-      throws IOException {
-
-      reporter.setStatus(split.toString());
-      
-      return new SequenceFileRecordReader(job, split) {
-          public synchronized boolean next(Writable key, Writable value)
-            throws IOException {
-            ObjectWritable wrapper = (ObjectWritable)value;
-            try {
-              wrapper.set(getValueClass().newInstance());
-            } catch (Exception e) {
-              throw new IOException(e.toString());
-            }
-            return super.next(key, (Writable)wrapper.get());
-          }
-          
-          // override the default - we want ObjectWritable-s here
-          public Writable createValue() {
-            return new ObjectWritable();
-          }
-        };
-    }
-  }
-
   /** Unwrap Lucene Documents created by reduce and add them to an index. */
   public static class OutputFormat
     extends org.apache.hadoop.mapred.OutputFormatBase {
@@ -290,12 +260,9 @@
 
     job.addInputPath(new Path(crawlDb, CrawlDb.CURRENT_NAME));
     job.addInputPath(new Path(linkDb, LinkDb.CURRENT_NAME));
+    job.setInputFormat(SequenceFileInputFormat.class);
 
-    job.setInputFormat(InputFormat.class);
-    //job.setInputKeyClass(Text.class);
-    //job.setInputValueClass(ObjectWritable.class);
-
-    //job.setCombinerClass(Indexer.class);
+    job.setMapperClass(Indexer.class);
     job.setReducerClass(Indexer.class);
 
     job.setOutputPath(indexDir);
@@ -332,6 +299,11 @@
       LOG.fatal("Indexer: " + StringUtils.stringifyException(e));
       return -1;
     }
+  }
+
+  public void map(WritableComparable key, Writable value,
+      OutputCollector output, Reporter reporter) throws IOException {
+    output.collect(key, new ObjectWritable(value));
   }
 
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?view=diff&rev=499878&r1=499877&r2=499878
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Thu Jan 25 10:11:59 2007
@@ -32,9 +32,7 @@
 import org.apache.hadoop.util.Progressable;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.Generator;
-import org.apache.nutch.fetcher.Fetcher;
 import org.apache.nutch.metadata.MetaWrapper;
-import org.apache.nutch.metadata.Metadata;
 import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.parse.ParseData;
@@ -104,37 +102,61 @@
    * types in reduce and use additional metadata.
    */
   public static class ObjectInputFormat extends SequenceFileInputFormat {
-    public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter)
-            throws IOException {
-
-      reporter.setStatus(split.toString());
+    
+    @Override
+    public RecordReader getRecordReader(InputSplit split,
+        JobConf job, Reporter reporter) {
+
+      try{
+        reporter.setStatus(split.toString());
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot set status for reported:", e);
+      }
       // find part name
-      final SegmentPart segmentPart = SegmentPart.get(split);
-      final String spString = segmentPart.toString();
-
-      return new SequenceFileRecordReader(job, split) {
-        public synchronized boolean next(Writable key, Writable value) throws IOException {
-          MetaWrapper wrapper = (MetaWrapper) value;
-          try {
-            wrapper.set(getValueClass().newInstance());
-          } catch (Exception e) {
-            throw new IOException(e.toString());
+      SegmentPart segmentPart;
+      final String spString;
+      try {
+        segmentPart = SegmentPart.get((FileSplit) split);
+        spString = segmentPart.toString();
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot identify segment:", e);
+      }
+
+      try {
+        return new SequenceFileRecordReader(job, (FileSplit)split) {
+          
+          @Override
+          public synchronized boolean next(Writable key, Writable value) throws IOException {
+            LOG.debug("Running OIF.next()");
+            
+            MetaWrapper wrapper = (MetaWrapper) value;
+            try {
+              wrapper.set(getValueClass().newInstance());
+            } catch (Exception e) {
+              throw new IOException(e.toString());
+            }
+
+            boolean res = super.next(key, (Writable) wrapper.get());
+            wrapper.setMeta(SEGMENT_PART_KEY, spString);
+            return res;
+          }
+          
+          @Override
+          public Writable createValue() {
+            return new MetaWrapper();
           }
-          boolean res = super.next(key, (Writable) wrapper.get());
-          wrapper.setMeta(SEGMENT_PART_KEY, spString);
-          return res;
-        }
-        
-        public Writable createValue() {
-          return new MetaWrapper();
-        }
-      };
+          
+        };
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot create RecordReader: ", e);
+      }
     }
   }
 
   public static class SegmentOutputFormat extends OutputFormatBase {
     private static final String DEFAULT_SLICE = "default";
     
+    @Override
     public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable progress) throws IOException {
       return new RecordWriter() {
         MapFile.Writer c_out = null;

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java?view=diff&rev=499878&r1=499877&r2=499878
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java Thu Jan 25 10:11:59 2007
@@ -47,33 +47,6 @@
   private boolean co, fe, ge, pa, pd, pt;
   private FileSystem fs;
 
-  /**
-   * Wraps inputs in an {@link ObjectWritable}, to permit merging different
-   * types in reduce.
-   */
-  public static class InputFormat extends SequenceFileInputFormat {
-    public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter)
-            throws IOException {
-      reporter.setStatus(split.toString());
-
-      return new SequenceFileRecordReader(job, split) {
-        public synchronized boolean next(Writable key, Writable value) throws IOException {
-          ObjectWritable wrapper = (ObjectWritable) value;
-          try {
-            wrapper.set(getValueClass().newInstance());
-          } catch (Exception e) {
-            throw new IOException(e.toString());
-          }
-          return super.next(key, (Writable) wrapper.get());
-        }
-        
-        public Writable createValue() {
-          return new ObjectWritable();
-        }
-      };
-    }
-  }
-  
   public static class InputCompatMapper extends MapReduceBase implements Mapper {
     private Text newKey = new Text();
 
@@ -83,7 +56,7 @@
         newKey.set(key.toString());
         key = newKey;
       }
-      collector.collect(key, value);
+      collector.collect(key, new ObjectWritable(value));
     }
     
   }
@@ -198,7 +171,7 @@
     if (pd) job.addInputPath(new Path(segment, ParseData.DIR_NAME));
     if (pt) job.addInputPath(new Path(segment, ParseText.DIR_NAME));
 
-    job.setInputFormat(InputFormat.class);
+    job.setInputFormat(SequenceFileInputFormat.class);
     job.setMapperClass(InputCompatMapper.class);
     job.setReducerClass(SegmentReader.class);