You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/06/08 23:23:23 UTC
svn commit: r189640 - in
/lucene/nutch/branches/mapred/src/java/org/apache/nutch: crawl/Indexer.java
io/ObjectWritable.java ipc/RPC.java mapred/SequenceFileInputFormat.java
mapred/SequenceFileRecordReader.java parse/ParseImpl.java
Author: cutting
Date: Wed Jun 8 14:23:22 2005
New Revision: 189640
URL: http://svn.apache.org/viewcvs?rev=189640&view=rev
Log:
Incomplete version of MapReduce-based indexer.
Added:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/ObjectWritable.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/RPC.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java
Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java?rev=189640&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java (added)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java Wed Jun 8 14:23:22 2005
@@ -0,0 +1,215 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.*;
+import java.util.*;
+import java.util.logging.*;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.net.*;
+import org.apache.nutch.util.*;
+import org.apache.nutch.mapred.*;
+import org.apache.nutch.parse.*;
+import org.apache.nutch.analysis.*;
+import org.apache.nutch.indexer.*;
+
+import org.apache.lucene.index.*;
+import org.apache.lucene.document.*;
+
+/** Maintains an inverted link map, listing incoming links for each url. */
+public class Indexer extends NutchConfigured implements Reducer {
+
+ public static final Logger LOG =
+ LogFormatter.getLogger("org.apache.nutch.crawl.Indexer");
+
+ /** Wraps inputs in an {@link ObjectWritable}, to permit merging different
+ * types in reduce. */
+ public static class InputFormat extends SequenceFileInputFormat {
+ public RecordReader getRecordReader(NutchFileSystem fs, FileSplit split,
+ JobConf job) throws IOException {
+ return new SequenceFileRecordReader(fs, split) {
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException {
+ ObjectWritable wrapper = (ObjectWritable)value;
+ try {
+ wrapper.set(getValueClass().newInstance());
+ } catch (Exception e) {
+ throw new IOException(e.toString());
+ }
+ return super.next(key, (Writable)wrapper.get());
+ }
+ };
+ }
+ }
+
+ /** Unwrap Lucene Documents created by reduce and add them to an index. */
+ public static class OutputFormat
+ implements org.apache.nutch.mapred.OutputFormat {
+ public RecordWriter getRecordWriter(final NutchFileSystem fs, JobConf job,
+ String name) throws IOException {
+ final File perm = new File(job.getOutputDir(), name);
+ final File temp = new File(job.getLocalDir(), "index-"
+ +Integer.toString(new Random().nextInt()));
+
+ fs.delete(perm); // delete old, if any
+
+ final IndexWriter writer = // build locally first
+ new IndexWriter(fs.startLocalOutput(perm, temp),
+ new NutchDocumentAnalyzer(), true);
+
+ writer.mergeFactor = job.getInt("indexer.mergeFactor", 10);
+ writer.minMergeDocs = job.getInt("indexer.minMergeDocs", 100);
+ writer.maxMergeDocs =
+ job.getInt("indexer.maxMergeDocs", Integer.MAX_VALUE);
+ writer.setTermIndexInterval
+ (job.getInt("indexer.termIndexInterval", 128));
+ writer.maxFieldLength = job.getInt("indexer.max.tokens", 10000);
+
+ return new RecordWriter() {
+
+ public void write(WritableComparable key, Writable value)
+ throws IOException { // unwrap & index doc
+ writer.addDocument((Document)((ObjectWritable)value).get());
+ }
+
+ public void close() throws IOException {
+ LOG.info("Optimizing index."); // optimize & close index
+ writer.optimize();
+ writer.close();
+ fs.completeLocalOutput(perm, temp); // copy to ndfs
+ }
+ };
+ }
+ }
+
+ public Indexer() {
+ super(null);
+ }
+
+ /** Construct an Indexer. */
+ public Indexer(NutchConf conf) {
+ super(conf);
+ }
+
+ private boolean boostByLinkCount;
+ private float scorePower;
+
+ public void configure(JobConf job) {
+ boostByLinkCount = job.getBoolean("indexer.boost.by.link.count", false);
+ scorePower = job.getFloat("indexer.score.power", 0.5f);
+ }
+
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output) throws IOException {
+ Inlinks inlinks = null;
+ ParseData parseData = null;
+ ParseText parseText = null;
+ while (values.hasNext()) {
+ Object value = ((ObjectWritable)values.next()).get(); // unwrap
+ if (value instanceof Inlinks) {
+ inlinks = (Inlinks)value;
+ } else if (value instanceof ParseData) {
+ parseData = (ParseData)value;
+ } else if (value instanceof ParseText) {
+ parseText = (ParseText)value;
+ } else {
+ LOG.warning("Unrecognized type: "+value.getClass());
+ }
+ }
+
+ if (parseText == null || parseData == null) {
+ return; // only have inlinks
+ }
+
+ Document doc = new Document();
+
+ // add docno & segment, used to map from merged index back to segment files
+ //doc.add(Field.UnIndexed("docNo", Long.toString(docNo, 16)));
+ //doc.add(Field.UnIndexed("segment", segmentName));
+
+ // add digest, used by dedup
+ //doc.add(Field.UnIndexed("digest", fo.getMD5Hash().toString()));
+
+ // 4. Apply boost to all indexed fields.
+ float boost =
+ IndexSegment.calculateBoost(1.0f,scorePower, boostByLinkCount,
+ inlinks == null ? 0 : inlinks.size());
+ doc.setBoost(boost);
+ // store boost for use by explain and dedup
+ doc.add(Field.UnIndexed("boost", Float.toString(boost)));
+
+ try {
+ doc = IndexingFilters.filter(doc, new ParseImpl(parseText, parseData),
+ null);
+ } catch (IndexingException e) {
+ LOG.warning("Error indexing "+key+": "+e);
+ return;
+ }
+
+ output.collect(key, new ObjectWritable(doc));
+ }
+
+ public void index(File indexDir, File segmentsDir) throws IOException {
+ JobConf job = Indexer.createJob(getConf(), indexDir);
+ job.setInputDir(segmentsDir);
+ job.set("mapred.input.subdir", ParseData.DIR_NAME);
+ JobClient.runJob(job);
+ }
+
+ public void index(File indexDir, File[] segments) throws IOException {
+ JobConf job = Indexer.createJob(getConf(), indexDir);
+ for (int i = 0; i < segments.length; i++) {
+ job.addInputDir(new File(segments[i], ParseData.DIR_NAME));
+ }
+ JobClient.runJob(job);
+ }
+
+ private static JobConf createJob(NutchConf config, File indexDir) {
+ JobConf job = new JobConf(config);
+
+ job.setInputFormat(InputFormat.class);
+ job.setInputKeyClass(UTF8.class);
+ job.setInputValueClass(ObjectWritable.class);
+
+ job.setMapperClass(Indexer.class);
+ //job.setCombinerClass(Indexer.class);
+ job.setReducerClass(Indexer.class);
+
+ job.setOutputDir(indexDir);
+ job.setOutputFormat(OutputFormat.class);
+ job.setOutputKeyClass(UTF8.class);
+ job.setOutputValueClass(ObjectWritable.class);
+
+ return job;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Indexer indexer = new Indexer(NutchConf.get());
+
+ if (args.length < 2) {
+ System.err.println("Usage: <linkdb> <segments>");
+ return;
+ }
+
+ indexer.index(new File(args[0]), new File(args[1]));
+ }
+
+
+
+}
Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/ObjectWritable.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/ObjectWritable.java?rev=189640&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/ObjectWritable.java (added)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/ObjectWritable.java Wed Jun 8 14:23:22 2005
@@ -0,0 +1,250 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.io;
+
+import java.lang.reflect.Proxy;
+import java.lang.reflect.Method;
+import java.lang.reflect.Array;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+
+import java.io.*;
+import java.util.*;
+
+/** A polymorphic Writable that writes an instance with it's class name.
+ * Handles arrays, strings and primitive types without a Writable wrapper.
+ */
+public class ObjectWritable implements Writable {
+
+ private Class declaredClass;
+ private Object instance;
+
+ public ObjectWritable() {}
+
+ public ObjectWritable(Object instance) {
+ set(instance);
+ }
+
+ public ObjectWritable(Class declaredClass, Object instance) {
+ this.declaredClass = declaredClass;
+ this.instance = instance;
+ }
+
+ /** Return the instance, or null if none. */
+ public Object get() { return instance; }
+
+ /** Return the class this is meant to be. */
+ public Class getDeclaredClass() { return declaredClass; }
+
+ /** Reset the instance. */
+ public void set(Object instance) {
+ this.declaredClass = instance.getClass();
+ this.instance = instance;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ readObject(in, this);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ writeObject(out, instance, declaredClass);
+ }
+
+ private static final Map PRIMITIVE_NAMES = new HashMap();
+ static {
+ PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
+ PRIMITIVE_NAMES.put("byte", Byte.TYPE);
+ PRIMITIVE_NAMES.put("char", Character.TYPE);
+ PRIMITIVE_NAMES.put("short", Short.TYPE);
+ PRIMITIVE_NAMES.put("int", Integer.TYPE);
+ PRIMITIVE_NAMES.put("long", Long.TYPE);
+ PRIMITIVE_NAMES.put("float", Float.TYPE);
+ PRIMITIVE_NAMES.put("double", Double.TYPE);
+ PRIMITIVE_NAMES.put("void", Void.TYPE);
+ }
+
+ private static class NullInstance implements Writable {
+ private Class declaredClass;
+ public NullInstance() {}
+ public NullInstance(Class declaredClass) {
+ this.declaredClass = declaredClass;
+ }
+ public void readFields(DataInput in) throws IOException {
+ String className = UTF8.readString(in);
+ declaredClass = (Class)PRIMITIVE_NAMES.get(className);
+ if (declaredClass == null) {
+ try {
+ declaredClass = Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e.toString());
+ }
+ }
+ }
+ public void write(DataOutput out) throws IOException {
+ UTF8.writeString(out, declaredClass.getName());
+ }
+ }
+
+ /** Write a {@link Writable}, {@link String}, primitive type, or an array of
+ * the preceding. */
+ public static void writeObject(DataOutput out, Object instance,
+ Class declaredClass) throws IOException {
+
+ if (instance == null) { // null
+ instance = new NullInstance(declaredClass);
+ declaredClass = NullInstance.class;
+ }
+
+ if (instance instanceof Writable) { // Writable
+
+ // write instance's class, to support subclasses of the declared class
+ UTF8.writeString(out, instance.getClass().getName());
+
+ ((Writable)instance).write(out);
+
+ return;
+ }
+
+ // write declared class for primitives, as they can't be subclassed, and
+ // the class of the instance may be a wrapper
+ UTF8.writeString(out, declaredClass.getName());
+
+ if (declaredClass.isArray()) { // array
+ int length = Array.getLength(instance);
+ out.writeInt(length);
+ for (int i = 0; i < length; i++) {
+ writeObject(out, Array.get(instance, i),
+ declaredClass.getComponentType());
+ }
+
+ } else if (declaredClass == String.class) { // String
+ UTF8.writeString(out, (String)instance);
+
+ } else if (declaredClass.isPrimitive()) { // primitive type
+
+ if (declaredClass == Boolean.TYPE) { // boolean
+ out.writeBoolean(((Boolean)instance).booleanValue());
+ } else if (declaredClass == Character.TYPE) { // char
+ out.writeChar(((Character)instance).charValue());
+ } else if (declaredClass == Byte.TYPE) { // byte
+ out.writeByte(((Byte)instance).byteValue());
+ } else if (declaredClass == Short.TYPE) { // short
+ out.writeShort(((Short)instance).shortValue());
+ } else if (declaredClass == Integer.TYPE) { // int
+ out.writeInt(((Integer)instance).intValue());
+ } else if (declaredClass == Long.TYPE) { // long
+ out.writeLong(((Long)instance).longValue());
+ } else if (declaredClass == Float.TYPE) { // float
+ out.writeFloat(((Float)instance).floatValue());
+ } else if (declaredClass == Double.TYPE) { // double
+ out.writeDouble(((Double)instance).doubleValue());
+ } else if (declaredClass == Void.TYPE) { // void
+ } else {
+ throw new IllegalArgumentException("Not a primitive: "+declaredClass);
+ }
+
+ } else {
+ throw new IOException("Can't write: "+instance+" as "+declaredClass);
+ }
+ }
+
+
+ /** Read a {@link Writable}, {@link String}, primitive type, or an array of
+ * the preceding. */
+ public static Object readObject(DataInput in)
+ throws IOException {
+ return readObject(in, null);
+ }
+
+ /** Read a {@link Writable}, {@link String}, primitive type, or an array of
+ * the preceding. */
+ public static Object readObject(DataInput in, ObjectWritable objectWritable)
+ throws IOException {
+ String className = UTF8.readString(in);
+ Class declaredClass = (Class)PRIMITIVE_NAMES.get(className);
+ if (declaredClass == null) {
+ try {
+ declaredClass = Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e.toString());
+ }
+ }
+
+ Object instance;
+
+ if (declaredClass == NullInstance.class) { // null
+ NullInstance wrapper = new NullInstance();
+ wrapper.readFields(in);
+ declaredClass = wrapper.declaredClass;
+ instance = null;
+
+ } else if (declaredClass.isPrimitive()) { // primitive types
+
+ if (declaredClass == Boolean.TYPE) { // boolean
+ instance = Boolean.valueOf(in.readBoolean());
+ } else if (declaredClass == Character.TYPE) { // char
+ instance = new Character(in.readChar());
+ } else if (declaredClass == Byte.TYPE) { // byte
+ instance = new Byte(in.readByte());
+ } else if (declaredClass == Short.TYPE) { // short
+ instance = new Short(in.readShort());
+ } else if (declaredClass == Integer.TYPE) { // int
+ instance = new Integer(in.readInt());
+ } else if (declaredClass == Long.TYPE) { // long
+ instance = new Long(in.readLong());
+ } else if (declaredClass == Float.TYPE) { // float
+ instance = new Float(in.readFloat());
+ } else if (declaredClass == Double.TYPE) { // double
+ instance = new Double(in.readDouble());
+ } else if (declaredClass == Void.TYPE) { // void
+ instance = null;
+ } else {
+ throw new IllegalArgumentException("Not a primitive: "+declaredClass);
+ }
+
+ } else if (declaredClass.isArray()) { // array
+ int length = in.readInt();
+ instance = Array.newInstance(declaredClass.getComponentType(), length);
+ for (int i = 0; i < length; i++) {
+ Array.set(instance, i, readObject(in));
+ }
+
+ } else if (declaredClass == String.class) { // String
+ instance = UTF8.readString(in);
+
+ } else { // Writable
+ try {
+ Writable writable = (Writable)declaredClass.newInstance();
+ writable.readFields(in);
+ instance = writable;
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (objectWritable != null) { // store values
+ objectWritable.declaredClass = declaredClass;
+ objectWritable.instance = instance;
+ }
+
+ return instance;
+
+ }
+
+}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/RPC.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/RPC.java?rev=189640&r1=189639&r2=189640&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/RPC.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/RPC.java Wed Jun 8 14:23:22 2005
@@ -54,182 +54,6 @@
private RPC() {} // no public ctor
- private static final Map PRIMITIVE_NAMES = new HashMap();
- static {
- PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
- PRIMITIVE_NAMES.put("byte", Byte.TYPE);
- PRIMITIVE_NAMES.put("char", Character.TYPE);
- PRIMITIVE_NAMES.put("short", Short.TYPE);
- PRIMITIVE_NAMES.put("int", Integer.TYPE);
- PRIMITIVE_NAMES.put("long", Long.TYPE);
- PRIMITIVE_NAMES.put("float", Float.TYPE);
- PRIMITIVE_NAMES.put("double", Double.TYPE);
- PRIMITIVE_NAMES.put("void", Void.TYPE);
- }
-
- private static class NullInstance implements Writable {
- private Class declaredClass;
- public NullInstance() {}
- public NullInstance(Class declaredClass) {
- this.declaredClass = declaredClass;
- }
- public void readFields(DataInput in) throws IOException {
- String className = UTF8.readString(in);
- declaredClass = (Class)PRIMITIVE_NAMES.get(className);
- if (declaredClass == null) {
- try {
- declaredClass = Class.forName(className);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e.toString());
- }
- }
- }
- public void write(DataOutput out) throws IOException {
- UTF8.writeString(out, declaredClass.getName());
- }
- }
-
- private static void writeObject(DataOutput out, Object instance,
- Class declaredClass) throws IOException {
-
- if (instance == null) { // null
- instance = new NullInstance(declaredClass);
- declaredClass = NullInstance.class;
- }
-
- if (instance instanceof Writable) { // Writable
-
- // write instance's class, to support subclasses of the declared class
- UTF8.writeString(out, instance.getClass().getName());
-
- ((Writable)instance).write(out);
-
- return;
- }
-
- // write declared class for primitives, as they can't be subclassed, and
- // the class of the instance may be a wrapper
- UTF8.writeString(out, declaredClass.getName());
-
- if (declaredClass.isArray()) { // array
- int length = Array.getLength(instance);
- out.writeInt(length);
- for (int i = 0; i < length; i++) {
- writeObject(out, Array.get(instance, i),
- declaredClass.getComponentType());
- }
-
- } else if (declaredClass == String.class) { // String
- UTF8.writeString(out, (String)instance);
-
- } else if (declaredClass.isPrimitive()) { // primitive type
-
- if (declaredClass == Boolean.TYPE) { // boolean
- out.writeBoolean(((Boolean)instance).booleanValue());
- } else if (declaredClass == Character.TYPE) { // char
- out.writeChar(((Character)instance).charValue());
- } else if (declaredClass == Byte.TYPE) { // byte
- out.writeByte(((Byte)instance).byteValue());
- } else if (declaredClass == Short.TYPE) { // short
- out.writeShort(((Short)instance).shortValue());
- } else if (declaredClass == Integer.TYPE) { // int
- out.writeInt(((Integer)instance).intValue());
- } else if (declaredClass == Long.TYPE) { // long
- out.writeLong(((Long)instance).longValue());
- } else if (declaredClass == Float.TYPE) { // float
- out.writeFloat(((Float)instance).floatValue());
- } else if (declaredClass == Double.TYPE) { // double
- out.writeDouble(((Double)instance).doubleValue());
- } else if (declaredClass == Void.TYPE) { // void
- } else {
- throw new IllegalArgumentException("Not a primitive: "+declaredClass);
- }
-
- } else {
- throw new IOException("Can't write: "+instance+" as "+declaredClass);
- }
- }
-
-
- private static Object readObject(DataInput in)
- throws IOException {
- return readObject(in, null);
- }
-
- private static Object readObject(DataInput in, ObjectWritable objectWritable)
- throws IOException {
- String className = UTF8.readString(in);
- Class declaredClass = (Class)PRIMITIVE_NAMES.get(className);
- if (declaredClass == null) {
- try {
- declaredClass = Class.forName(className);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e.toString());
- }
- }
-
- Object instance;
-
- if (declaredClass == NullInstance.class) { // null
- NullInstance wrapper = new NullInstance();
- wrapper.readFields(in);
- declaredClass = wrapper.declaredClass;
- instance = null;
-
- } else if (declaredClass.isPrimitive()) { // primitive types
-
- if (declaredClass == Boolean.TYPE) { // boolean
- instance = Boolean.valueOf(in.readBoolean());
- } else if (declaredClass == Character.TYPE) { // char
- instance = new Character(in.readChar());
- } else if (declaredClass == Byte.TYPE) { // byte
- instance = new Byte(in.readByte());
- } else if (declaredClass == Short.TYPE) { // short
- instance = new Short(in.readShort());
- } else if (declaredClass == Integer.TYPE) { // int
- instance = new Integer(in.readInt());
- } else if (declaredClass == Long.TYPE) { // long
- instance = new Long(in.readLong());
- } else if (declaredClass == Float.TYPE) { // float
- instance = new Float(in.readFloat());
- } else if (declaredClass == Double.TYPE) { // double
- instance = new Double(in.readDouble());
- } else if (declaredClass == Void.TYPE) { // void
- instance = null;
- } else {
- throw new IllegalArgumentException("Not a primitive: "+declaredClass);
- }
-
- } else if (declaredClass.isArray()) { // array
- int length = in.readInt();
- instance = Array.newInstance(declaredClass.getComponentType(), length);
- for (int i = 0; i < length; i++) {
- Array.set(instance, i, readObject(in));
- }
-
- } else if (declaredClass == String.class) { // String
- instance = UTF8.readString(in);
-
- } else { // Writable
- try {
- Writable writable = (Writable)declaredClass.newInstance();
- writable.readFields(in);
- instance = writable;
- } catch (InstantiationException e) {
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
-
- if (objectWritable != null) { // store values
- objectWritable.declaredClass = declaredClass;
- objectWritable.instance = instance;
- }
-
- return instance;
-
- }
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable {
@@ -261,8 +85,8 @@
ObjectWritable objectWritable = new ObjectWritable();
for (int i = 0; i < parameters.length; i++) {
- parameters[i] = readObject(in, objectWritable);
- parameterClasses[i] = objectWritable.declaredClass;
+ parameters[i] = ObjectWritable.readObject(in, objectWritable);
+ parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
@@ -270,7 +94,7 @@
UTF8.writeString(out, methodName);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
- writeObject(out, parameters[i], parameterClasses[i]);
+ ObjectWritable.writeObject(out, parameters[i], parameterClasses[i]);
}
}
@@ -285,33 +109,6 @@
}
buffer.append(")");
return buffer.toString();
- }
-
- }
-
- /** A polymorphic Writable that packages a Writable with its class name.
- * Also handles arrays and strings w/o a Writable wrapper.
- */
- private static class ObjectWritable implements Writable {
- private Class declaredClass;
- private Object instance;
-
- public ObjectWritable() {}
-
- public ObjectWritable(Class declaredClass, Object instance) {
- this.declaredClass = declaredClass;
- this.instance = instance;
- }
-
- /** Return the instance. */
- public Object get() { return instance; }
-
- public void readFields(DataInput in) throws IOException {
- readObject(in, this);
- }
-
- public void write(DataOutput out) throws IOException {
- writeObject(out, instance, declaredClass);
}
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java?rev=189640&r1=189639&r2=189640&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java Wed Jun 8 14:23:22 2005
@@ -47,36 +47,7 @@
public RecordReader getRecordReader(NutchFileSystem fs, FileSplit split,
JobConf job) throws IOException {
- // open the file and seek to the start of the split
- final SequenceFile.Reader in =
- new SequenceFile.Reader(fs, split.getFile().toString());
- final long end = split.getStart() + split.getLength();
-
- in.sync(split.getStart()); // sync to start
-
- return new RecordReader() {
- private boolean more = true;
-
- public synchronized boolean next(Writable key, Writable value)
- throws IOException {
- if (!more) return false;
- long pos = in.getPosition();
- boolean eof = in.next(key, value);
- if (pos >= end && in.syncSeen()) {
- more = false;
- } else {
- more = eof;
- }
- return more;
- }
-
- public synchronized long getPos() throws IOException {
- return in.getPosition();
- }
-
- public synchronized void close() throws IOException { in.close(); }
-
- };
+ return new SequenceFileRecordReader(fs, split);
}
}
Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java?rev=189640&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java (added)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java Wed Jun 8 14:23:22 2005
@@ -0,0 +1,73 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.mapred;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.nutch.fs.NutchFileSystem;
+
+import org.apache.nutch.io.SequenceFile;
+import org.apache.nutch.io.Writable;
+import org.apache.nutch.io.WritableComparable;
+import org.apache.nutch.io.LongWritable;
+import org.apache.nutch.io.UTF8;
+
+/** An {@link RecordReader} for {@link SequenceFile}s. */
+public class SequenceFileRecordReader implements RecordReader {
+ private SequenceFile.Reader in;
+ private long end;
+ private boolean more = true;
+
+ public SequenceFileRecordReader(NutchFileSystem fs, FileSplit split)
+ throws IOException {
+ this.in = new SequenceFile.Reader(fs, split.getFile().toString());
+ this.end = split.getStart() + split.getLength();
+
+ in.sync(split.getStart()); // sync to start
+ }
+
+
+ /** The class of key that must be passed to {@link
+ * #next(Writable,Writable)}.. */
+ public Class getKeyClass() { return in.getKeyClass(); }
+
+ /** The class of value that must be passed to {@link
+ * #next(Writable,Writable)}.. */
+ public Class getValueClass() { return in.getValueClass(); }
+
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException {
+ if (!more) return false;
+ long pos = in.getPosition();
+ boolean eof = in.next(key, value);
+ if (pos >= end && in.syncSeen()) {
+ more = false;
+ } else {
+ more = eof;
+ }
+ return more;
+ }
+
+ public synchronized long getPos() throws IOException {
+ return in.getPosition();
+ }
+
+ public synchronized void close() throws IOException { in.close(); }
+
+}
+
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java?rev=189640&r1=189639&r2=189640&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/parse/ParseImpl.java Wed Jun 8 14:23:22 2005
@@ -30,7 +30,11 @@
public ParseImpl() {}
public ParseImpl(String text, ParseData data) {
- this.text = new ParseText(text);
+ this(new ParseText(text), data);
+ }
+
+ public ParseImpl(ParseText text, ParseData data) {
+ this.text = text;
this.data = data;
}