You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/06/08 23:23:23 UTC

svn commit: r189640 - in /lucene/nutch/branches/mapred/src/java/org/apache/nutch: crawl/Indexer.java io/ObjectWritable.java ipc/RPC.java mapred/SequenceFileInputFormat.java mapred/SequenceFileRecordReader.java parse/ParseImpl.java

Author: cutting
Date: Wed Jun  8 14:23:22 2005
New Revision: 189640

URL: http://svn.apache.org/viewcvs?rev=189640&view=rev
Log:
Incomplete version of MapReduce-based indexer.

Added:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/ObjectWritable.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java
Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/RPC.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java

Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java?rev=189640&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java (added)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java Wed Jun  8 14:23:22 2005
@@ -0,0 +1,215 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.*;
+import java.util.*;
+import java.util.logging.*;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.net.*;
+import org.apache.nutch.util.*;
+import org.apache.nutch.mapred.*;
+import org.apache.nutch.parse.*;
+import org.apache.nutch.analysis.*;
+import org.apache.nutch.indexer.*;
+
+import org.apache.lucene.index.*;
+import org.apache.lucene.document.*;
+
+/** Maintains an inverted link map, listing incoming links for each url. */
+public class Indexer extends NutchConfigured implements Reducer {
+
+  public static final Logger LOG =
+    LogFormatter.getLogger("org.apache.nutch.crawl.Indexer");
+
+  /** Wraps inputs in an {@link ObjectWritable}, to permit merging different
+   * types in reduce. */
+  public static class InputFormat extends SequenceFileInputFormat {
+    public RecordReader getRecordReader(NutchFileSystem fs, FileSplit split,
+                                        JobConf job) throws IOException {
+      return new SequenceFileRecordReader(fs, split) {
+          public synchronized boolean next(Writable key, Writable value)
+            throws IOException {
+            ObjectWritable wrapper = (ObjectWritable)value;
+            try {
+              wrapper.set(getValueClass().newInstance());
+            } catch (Exception e) {
+              throw new IOException(e.toString());
+            }
+            return super.next(key, (Writable)wrapper.get());
+          }
+        };
+    }
+  }
+
+  /** Unwrap Lucene Documents created by reduce and add them to an index. */
+  public static class OutputFormat
+    implements org.apache.nutch.mapred.OutputFormat {
+    public RecordWriter getRecordWriter(final NutchFileSystem fs, JobConf job,
+                                        String name) throws IOException {
+      final File perm = new File(job.getOutputDir(), name);
+      final File temp = new File(job.getLocalDir(), "index-"
+                                 +Integer.toString(new Random().nextInt()));
+
+      fs.delete(perm);                            // delete old, if any
+
+      final IndexWriter writer =                  // build locally first
+        new IndexWriter(fs.startLocalOutput(perm, temp),
+                        new NutchDocumentAnalyzer(), true);
+
+      writer.mergeFactor = job.getInt("indexer.mergeFactor", 10);
+      writer.minMergeDocs = job.getInt("indexer.minMergeDocs", 100);
+      writer.maxMergeDocs =
+        job.getInt("indexer.maxMergeDocs", Integer.MAX_VALUE);
+      writer.setTermIndexInterval
+        (job.getInt("indexer.termIndexInterval", 128));
+      writer.maxFieldLength = job.getInt("indexer.max.tokens", 10000);
+
+      return new RecordWriter() {
+
+          public void write(WritableComparable key, Writable value)
+            throws IOException {                  // unwrap & index doc
+            writer.addDocument((Document)((ObjectWritable)value).get());
+          }
+          
+          public void close() throws IOException {
+            LOG.info("Optimizing index.");        // optimize & close index
+            writer.optimize();
+            writer.close();
+            fs.completeLocalOutput(perm, temp);   // copy to ndfs
+          }
+        };
+    }
+  }
+
+  public Indexer() {
+    super(null);
+  }
+
+  /** Construct an Indexer. */
+  public Indexer(NutchConf conf) {
+    super(conf);
+  }
+
+  private boolean boostByLinkCount;
+  private float scorePower;
+
+  public void configure(JobConf job) {
+    boostByLinkCount = job.getBoolean("indexer.boost.by.link.count", false);
+    scorePower = job.getFloat("indexer.score.power", 0.5f);
+  }
+
+  public void reduce(WritableComparable key, Iterator values,
+                     OutputCollector output) throws IOException {
+    Inlinks inlinks = null;
+    ParseData parseData = null;
+    ParseText parseText = null;
+    while (values.hasNext()) {
+      Object value = ((ObjectWritable)values.next()).get(); // unwrap
+      if (value instanceof Inlinks) {
+        inlinks = (Inlinks)value;
+      } else if (value instanceof ParseData) {
+        parseData = (ParseData)value;
+      } else if (value instanceof ParseText) {
+        parseText = (ParseText)value;
+      } else {
+        LOG.warning("Unrecognized type: "+value.getClass());
+      }
+    }      
+
+    if (parseText == null || parseData == null) {
+      return;                                     // only have inlinks
+    }
+
+    Document doc = new Document();
+
+    // add docno & segment, used to map from merged index back to segment files
+    //doc.add(Field.UnIndexed("docNo", Long.toString(docNo, 16)));
+    //doc.add(Field.UnIndexed("segment", segmentName));
+
+    // add digest, used by dedup
+    //doc.add(Field.UnIndexed("digest", fo.getMD5Hash().toString()));
+
+    // 4. Apply boost to all indexed fields.
+    float boost =
+      IndexSegment.calculateBoost(1.0f,scorePower, boostByLinkCount,
+                                  inlinks == null ? 0 : inlinks.size());
+    doc.setBoost(boost);
+    // store boost for use by explain and dedup
+    doc.add(Field.UnIndexed("boost", Float.toString(boost)));
+
+    try {
+      doc = IndexingFilters.filter(doc, new ParseImpl(parseText, parseData),
+                                   null);
+    } catch (IndexingException e) {
+      LOG.warning("Error indexing "+key+": "+e);
+      return;
+    }
+
+    output.collect(key, new ObjectWritable(doc));
+  }
+
+  public void index(File indexDir, File segmentsDir) throws IOException {
+    JobConf job = Indexer.createJob(getConf(), indexDir);
+    job.setInputDir(segmentsDir);
+    job.set("mapred.input.subdir", ParseData.DIR_NAME);
+    JobClient.runJob(job);
+  }
+
+  public void index(File indexDir, File[] segments) throws IOException {
+    JobConf job = Indexer.createJob(getConf(), indexDir);
+    for (int i = 0; i < segments.length; i++) {
+      job.addInputDir(new File(segments[i], ParseData.DIR_NAME));
+    }
+    JobClient.runJob(job);
+  }
+
+  private static JobConf createJob(NutchConf config, File indexDir) {
+    JobConf job = new JobConf(config);
+
+    job.setInputFormat(InputFormat.class);
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(ObjectWritable.class);
+
+    job.setMapperClass(Indexer.class);
+    //job.setCombinerClass(Indexer.class);
+    job.setReducerClass(Indexer.class);
+
+    job.setOutputDir(indexDir);
+    job.setOutputFormat(OutputFormat.class);
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(ObjectWritable.class);
+
+    return job;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Indexer indexer = new Indexer(NutchConf.get());
+    
+    if (args.length < 2) {
+      System.err.println("Usage: <linkdb> <segments>");
+      return;
+    }
+    
+    indexer.index(new File(args[0]), new File(args[1]));
+  }
+
+
+
+}

Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/ObjectWritable.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/ObjectWritable.java?rev=189640&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/ObjectWritable.java (added)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/ObjectWritable.java Wed Jun  8 14:23:22 2005
@@ -0,0 +1,250 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.io;
+
+import java.lang.reflect.Proxy;
+import java.lang.reflect.Method;
+import java.lang.reflect.Array;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+
+import java.io.*;
+import java.util.*;
+
+/** A polymorphic Writable that writes an instance with it's class name.
+ * Handles arrays, strings and primitive types without a Writable wrapper.
+ */
+public class ObjectWritable implements Writable {
+
+  private Class declaredClass;
+  private Object instance;
+
+  public ObjectWritable() {}
+  
+  public ObjectWritable(Object instance) {
+    set(instance);
+  }
+
+  public ObjectWritable(Class declaredClass, Object instance) {
+    this.declaredClass = declaredClass;
+    this.instance = instance;
+  }
+
+  /** Return the instance, or null if none. */
+  public Object get() { return instance; }
+  
+  /** Return the class this is meant to be. */
+  public Class getDeclaredClass() { return declaredClass; }
+  
+  /** Reset the instance. */
+  public void set(Object instance) {
+    this.declaredClass = instance.getClass();
+    this.instance = instance;
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    readObject(in, this);
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    writeObject(out, instance, declaredClass);
+  }
+
+  private static final Map PRIMITIVE_NAMES = new HashMap();
+  static {
+    PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
+    PRIMITIVE_NAMES.put("byte", Byte.TYPE);
+    PRIMITIVE_NAMES.put("char", Character.TYPE);
+    PRIMITIVE_NAMES.put("short", Short.TYPE);
+    PRIMITIVE_NAMES.put("int", Integer.TYPE);
+    PRIMITIVE_NAMES.put("long", Long.TYPE);
+    PRIMITIVE_NAMES.put("float", Float.TYPE);
+    PRIMITIVE_NAMES.put("double", Double.TYPE);
+    PRIMITIVE_NAMES.put("void", Void.TYPE);
+  }
+
+  private static class NullInstance implements Writable {
+    private Class declaredClass;
+    public NullInstance() {}
+    public NullInstance(Class declaredClass) {
+      this.declaredClass = declaredClass;
+    }
+    public void readFields(DataInput in) throws IOException {
+      String className = UTF8.readString(in);
+      declaredClass = (Class)PRIMITIVE_NAMES.get(className);
+      if (declaredClass == null) {
+        try {
+          declaredClass = Class.forName(className);
+        } catch (ClassNotFoundException e) {
+          throw new RuntimeException(e.toString());
+        }
+      }
+    }
+    public void write(DataOutput out) throws IOException {
+      UTF8.writeString(out, declaredClass.getName());
+    }
+  }
+
+  /** Write a {@link Writable}, {@link String}, primitive type, or an array of
+   * the preceding. */
+  public static void writeObject(DataOutput out, Object instance,
+                                 Class declaredClass) throws IOException {
+
+    if (instance == null) {                       // null
+      instance = new NullInstance(declaredClass);
+      declaredClass = NullInstance.class;
+    }
+
+    if (instance instanceof Writable) {           // Writable
+
+      // write instance's class, to support subclasses of the declared class
+      UTF8.writeString(out, instance.getClass().getName());
+      
+      ((Writable)instance).write(out);
+
+      return;
+    }
+
+    // write declared class for primitives, as they can't be subclassed, and
+    // the class of the instance may be a wrapper
+    UTF8.writeString(out, declaredClass.getName());
+
+    if (declaredClass.isArray()) {                // array
+      int length = Array.getLength(instance);
+      out.writeInt(length);
+      for (int i = 0; i < length; i++) {
+        writeObject(out, Array.get(instance, i),
+                    declaredClass.getComponentType());
+      }
+      
+    } else if (declaredClass == String.class) {   // String
+      UTF8.writeString(out, (String)instance);
+      
+    } else if (declaredClass.isPrimitive()) {     // primitive type
+
+      if (declaredClass == Boolean.TYPE) {        // boolean
+        out.writeBoolean(((Boolean)instance).booleanValue());
+      } else if (declaredClass == Character.TYPE) { // char
+        out.writeChar(((Character)instance).charValue());
+      } else if (declaredClass == Byte.TYPE) {    // byte
+        out.writeByte(((Byte)instance).byteValue());
+      } else if (declaredClass == Short.TYPE) {   // short
+        out.writeShort(((Short)instance).shortValue());
+      } else if (declaredClass == Integer.TYPE) { // int
+        out.writeInt(((Integer)instance).intValue());
+      } else if (declaredClass == Long.TYPE) {    // long
+        out.writeLong(((Long)instance).longValue());
+      } else if (declaredClass == Float.TYPE) {   // float
+        out.writeFloat(((Float)instance).floatValue());
+      } else if (declaredClass == Double.TYPE) {  // double
+        out.writeDouble(((Double)instance).doubleValue());
+      } else if (declaredClass == Void.TYPE) {    // void
+      } else {
+        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
+      }
+      
+    } else {
+      throw new IOException("Can't write: "+instance+" as "+declaredClass);
+    }
+  }
+  
+  
+  /** Read a {@link Writable}, {@link String}, primitive type, or an array of
+   * the preceding. */
+  public static Object readObject(DataInput in)
+    throws IOException {
+    return readObject(in, null);
+  }
+    
+  /** Read a {@link Writable}, {@link String}, primitive type, or an array of
+   * the preceding. */
+  public static Object readObject(DataInput in, ObjectWritable objectWritable)
+    throws IOException {
+    String className = UTF8.readString(in);
+    Class declaredClass = (Class)PRIMITIVE_NAMES.get(className);
+    if (declaredClass == null) {
+      try {
+        declaredClass = Class.forName(className);
+      } catch (ClassNotFoundException e) {
+        throw new RuntimeException(e.toString());
+      }
+    }    
+
+    Object instance;
+    
+    if (declaredClass == NullInstance.class) {         // null
+      NullInstance wrapper = new NullInstance();
+      wrapper.readFields(in);
+      declaredClass = wrapper.declaredClass;
+      instance = null;
+
+    } else if (declaredClass.isPrimitive()) {          // primitive types
+
+      if (declaredClass == Boolean.TYPE) {             // boolean
+        instance = Boolean.valueOf(in.readBoolean());
+      } else if (declaredClass == Character.TYPE) {    // char
+        instance = new Character(in.readChar());
+      } else if (declaredClass == Byte.TYPE) {         // byte
+        instance = new Byte(in.readByte());
+      } else if (declaredClass == Short.TYPE) {        // short
+        instance = new Short(in.readShort());
+      } else if (declaredClass == Integer.TYPE) {      // int
+        instance = new Integer(in.readInt());
+      } else if (declaredClass == Long.TYPE) {         // long
+        instance = new Long(in.readLong());
+      } else if (declaredClass == Float.TYPE) {        // float
+        instance = new Float(in.readFloat());
+      } else if (declaredClass == Double.TYPE) {       // double
+        instance = new Double(in.readDouble());
+      } else if (declaredClass == Void.TYPE) {         // void
+        instance = null;
+      } else {
+        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
+      }
+
+    } else if (declaredClass.isArray()) {              // array
+      int length = in.readInt();
+      instance = Array.newInstance(declaredClass.getComponentType(), length);
+      for (int i = 0; i < length; i++) {
+        Array.set(instance, i, readObject(in));
+      }
+      
+    } else if (declaredClass == String.class) {        // String
+      instance = UTF8.readString(in);
+      
+    } else {                                      // Writable
+      try {
+        Writable writable = (Writable)declaredClass.newInstance();
+        writable.readFields(in);
+        instance = writable;
+      } catch (InstantiationException e) {
+        throw new RuntimeException(e);
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    if (objectWritable != null) {                 // store values
+      objectWritable.declaredClass = declaredClass;
+      objectWritable.instance = instance;
+    }
+
+    return instance;
+      
+  }
+  
+}

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/RPC.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/RPC.java?rev=189640&r1=189639&r2=189640&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/RPC.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/RPC.java Wed Jun  8 14:23:22 2005
@@ -54,182 +54,6 @@
 
   private RPC() {}                                  // no public ctor
 
-  private static final Map PRIMITIVE_NAMES = new HashMap();
-  static {
-    PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
-    PRIMITIVE_NAMES.put("byte", Byte.TYPE);
-    PRIMITIVE_NAMES.put("char", Character.TYPE);
-    PRIMITIVE_NAMES.put("short", Short.TYPE);
-    PRIMITIVE_NAMES.put("int", Integer.TYPE);
-    PRIMITIVE_NAMES.put("long", Long.TYPE);
-    PRIMITIVE_NAMES.put("float", Float.TYPE);
-    PRIMITIVE_NAMES.put("double", Double.TYPE);
-    PRIMITIVE_NAMES.put("void", Void.TYPE);
-  }
-
-  private static class NullInstance implements Writable {
-    private Class declaredClass;
-    public NullInstance() {}
-    public NullInstance(Class declaredClass) {
-      this.declaredClass = declaredClass;
-    }
-    public void readFields(DataInput in) throws IOException {
-      String className = UTF8.readString(in);
-      declaredClass = (Class)PRIMITIVE_NAMES.get(className);
-      if (declaredClass == null) {
-        try {
-          declaredClass = Class.forName(className);
-        } catch (ClassNotFoundException e) {
-          throw new RuntimeException(e.toString());
-        }
-      }
-    }
-    public void write(DataOutput out) throws IOException {
-      UTF8.writeString(out, declaredClass.getName());
-    }
-  }
-
-  private static void writeObject(DataOutput out, Object instance,
-                                  Class declaredClass) throws IOException {
-
-    if (instance == null) {                       // null
-      instance = new NullInstance(declaredClass);
-      declaredClass = NullInstance.class;
-    }
-
-    if (instance instanceof Writable) {           // Writable
-
-      // write instance's class, to support subclasses of the declared class
-      UTF8.writeString(out, instance.getClass().getName());
-      
-      ((Writable)instance).write(out);
-
-      return;
-    }
-
-    // write declared class for primitives, as they can't be subclassed, and
-    // the class of the instance may be a wrapper
-    UTF8.writeString(out, declaredClass.getName());
-
-    if (declaredClass.isArray()) {                // array
-      int length = Array.getLength(instance);
-      out.writeInt(length);
-      for (int i = 0; i < length; i++) {
-        writeObject(out, Array.get(instance, i),
-                    declaredClass.getComponentType());
-      }
-      
-    } else if (declaredClass == String.class) {   // String
-      UTF8.writeString(out, (String)instance);
-      
-    } else if (declaredClass.isPrimitive()) {     // primitive type
-
-      if (declaredClass == Boolean.TYPE) {        // boolean
-        out.writeBoolean(((Boolean)instance).booleanValue());
-      } else if (declaredClass == Character.TYPE) { // char
-        out.writeChar(((Character)instance).charValue());
-      } else if (declaredClass == Byte.TYPE) {    // byte
-        out.writeByte(((Byte)instance).byteValue());
-      } else if (declaredClass == Short.TYPE) {   // short
-        out.writeShort(((Short)instance).shortValue());
-      } else if (declaredClass == Integer.TYPE) { // int
-        out.writeInt(((Integer)instance).intValue());
-      } else if (declaredClass == Long.TYPE) {    // long
-        out.writeLong(((Long)instance).longValue());
-      } else if (declaredClass == Float.TYPE) {   // float
-        out.writeFloat(((Float)instance).floatValue());
-      } else if (declaredClass == Double.TYPE) {  // double
-        out.writeDouble(((Double)instance).doubleValue());
-      } else if (declaredClass == Void.TYPE) {    // void
-      } else {
-        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
-      }
-      
-    } else {
-      throw new IOException("Can't write: "+instance+" as "+declaredClass);
-    }
-  }
-  
-  
-  private static Object readObject(DataInput in)
-    throws IOException {
-    return readObject(in, null);
-  }
-    
-  private static Object readObject(DataInput in, ObjectWritable objectWritable)
-    throws IOException {
-    String className = UTF8.readString(in);
-    Class declaredClass = (Class)PRIMITIVE_NAMES.get(className);
-    if (declaredClass == null) {
-      try {
-        declaredClass = Class.forName(className);
-      } catch (ClassNotFoundException e) {
-        throw new RuntimeException(e.toString());
-      }
-    }    
-
-    Object instance;
-    
-    if (declaredClass == NullInstance.class) {         // null
-      NullInstance wrapper = new NullInstance();
-      wrapper.readFields(in);
-      declaredClass = wrapper.declaredClass;
-      instance = null;
-
-    } else if (declaredClass.isPrimitive()) {          // primitive types
-
-      if (declaredClass == Boolean.TYPE) {             // boolean
-        instance = Boolean.valueOf(in.readBoolean());
-      } else if (declaredClass == Character.TYPE) {    // char
-        instance = new Character(in.readChar());
-      } else if (declaredClass == Byte.TYPE) {         // byte
-        instance = new Byte(in.readByte());
-      } else if (declaredClass == Short.TYPE) {        // short
-        instance = new Short(in.readShort());
-      } else if (declaredClass == Integer.TYPE) {      // int
-        instance = new Integer(in.readInt());
-      } else if (declaredClass == Long.TYPE) {         // long
-        instance = new Long(in.readLong());
-      } else if (declaredClass == Float.TYPE) {        // float
-        instance = new Float(in.readFloat());
-      } else if (declaredClass == Double.TYPE) {       // double
-        instance = new Double(in.readDouble());
-      } else if (declaredClass == Void.TYPE) {         // void
-        instance = null;
-      } else {
-        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
-      }
-
-    } else if (declaredClass.isArray()) {              // array
-      int length = in.readInt();
-      instance = Array.newInstance(declaredClass.getComponentType(), length);
-      for (int i = 0; i < length; i++) {
-        Array.set(instance, i, readObject(in));
-      }
-      
-    } else if (declaredClass == String.class) {        // String
-      instance = UTF8.readString(in);
-      
-    } else {                                      // Writable
-      try {
-        Writable writable = (Writable)declaredClass.newInstance();
-        writable.readFields(in);
-        instance = writable;
-      } catch (InstantiationException e) {
-        throw new RuntimeException(e);
-      } catch (IllegalAccessException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    if (objectWritable != null) {                 // store values
-      objectWritable.declaredClass = declaredClass;
-      objectWritable.instance = instance;
-    }
-
-    return instance;
-      
-  }
 
   /** A method invocation, including the method name and its parameters.*/
   private static class Invocation implements Writable {
@@ -261,8 +85,8 @@
 
       ObjectWritable objectWritable = new ObjectWritable();
       for (int i = 0; i < parameters.length; i++) {
-        parameters[i] = readObject(in, objectWritable);
-        parameterClasses[i] = objectWritable.declaredClass;
+        parameters[i] = ObjectWritable.readObject(in, objectWritable);
+        parameterClasses[i] = objectWritable.getDeclaredClass();
       }
     }
 
@@ -270,7 +94,7 @@
       UTF8.writeString(out, methodName);
       out.writeInt(parameterClasses.length);
       for (int i = 0; i < parameterClasses.length; i++) {
-        writeObject(out, parameters[i], parameterClasses[i]);
+        ObjectWritable.writeObject(out, parameters[i], parameterClasses[i]);
       }
     }
 
@@ -285,33 +109,6 @@
       }
       buffer.append(")");
       return buffer.toString();
-    }
-
-  }
-
-  /** A polymorphic Writable that packages a Writable with its class name.
-   * Also handles arrays and strings w/o a Writable wrapper.
-   */
-  private static class ObjectWritable implements Writable {
-    private Class declaredClass;
-    private Object instance;
-
-    public ObjectWritable() {}
-
-    public ObjectWritable(Class declaredClass, Object instance) {
-      this.declaredClass = declaredClass;
-      this.instance = instance;
-    }
-
-    /** Return the instance. */
-    public Object get() { return instance; }
-
-    public void readFields(DataInput in) throws IOException {
-      readObject(in, this);
-    }
-
-    public void write(DataOutput out) throws IOException {
-      writeObject(out, instance, declaredClass);
     }
 
   }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java?rev=189640&r1=189639&r2=189640&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java Wed Jun  8 14:23:22 2005
@@ -47,36 +47,7 @@
   public RecordReader getRecordReader(NutchFileSystem fs, FileSplit split,
                                       JobConf job) throws IOException {
 
-    // open the file and seek to the start of the split
-    final SequenceFile.Reader in =
-      new SequenceFile.Reader(fs, split.getFile().toString());
-    final long end = split.getStart() + split.getLength();
-
-    in.sync(split.getStart());                    // sync to start
-
-    return new RecordReader() {
-        private boolean more = true;
-
-        public synchronized boolean next(Writable key, Writable value)
-          throws IOException {
-          if (!more) return false;
-          long pos = in.getPosition();
-          boolean eof = in.next(key, value);
-          if (pos >= end && in.syncSeen()) {
-            more = false;
-          } else {
-            more = eof;
-          }
-          return more;
-        }
-        
-        public synchronized long getPos() throws IOException {
-          return in.getPosition();
-        }
-
-        public synchronized void close() throws IOException { in.close(); }
-
-      };
+    return new SequenceFileRecordReader(fs, split);
   }
 
 }

Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java?rev=189640&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java (added)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java Wed Jun  8 14:23:22 2005
@@ -0,0 +1,73 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.mapred;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.nutch.fs.NutchFileSystem;
+
+import org.apache.nutch.io.SequenceFile;
+import org.apache.nutch.io.Writable;
+import org.apache.nutch.io.WritableComparable;
+import org.apache.nutch.io.LongWritable;
+import org.apache.nutch.io.UTF8;
+
+/** An {@link RecordReader} for {@link SequenceFile}s. */
+public class SequenceFileRecordReader implements RecordReader {
+  private SequenceFile.Reader in;
+  private long end;
+  private boolean more = true;
+
+  public SequenceFileRecordReader(NutchFileSystem fs, FileSplit split)
+    throws IOException {
+    this.in = new SequenceFile.Reader(fs, split.getFile().toString());
+    this.end = split.getStart() + split.getLength();
+
+    in.sync(split.getStart());                    // sync to start
+  }
+
+
+  /** The class of key that must be passed to {@link
+   * #next(Writable,Writable)}.. */
+  public Class getKeyClass() { return in.getKeyClass(); }
+
+  /** The class of value that must be passed to {@link
+   * #next(Writable,Writable)}.. */
+  public Class getValueClass() { return in.getValueClass(); }
+  
+  public synchronized boolean next(Writable key, Writable value)
+    throws IOException {
+    if (!more) return false;
+    long pos = in.getPosition();
+    boolean eof = in.next(key, value);
+    if (pos >= end && in.syncSeen()) {
+      more = false;
+    } else {
+      more = eof;
+    }
+    return more;
+  }
+  
+  public synchronized long getPos() throws IOException {
+    return in.getPosition();
+  }
+  
+  public synchronized void close() throws IOException { in.close(); }
+  
+}
+

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java?rev=189640&r1=189639&r2=189640&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java Wed Jun  8 14:23:22 2005
@@ -30,7 +30,11 @@
   public ParseImpl() {}
 
   public ParseImpl(String text, ParseData data) {
-    this.text = new ParseText(text);
+    this(new ParseText(text), data);
+  }
+
+  public ParseImpl(ParseText text, ParseData data) {
+    this.text = text;
     this.data = data;
   }