You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by do...@apache.org on 2007/07/17 17:16:42 UTC
svn commit: r556946 - in /lucene/nutch/trunk: ./
src/java/org/apache/nutch/fetcher/ src/java/org/apache/nutch/parse/
src/java/org/apache/nutch/protocol/ src/java/org/apache/nutch/segment/
Author: dogacan
Date: Tue Jul 17 08:16:40 2007
New Revision: 556946
URL: http://svn.apache.org/viewvc?view=rev&rev=556946
Log:
NUTCH-506 - Delegate compression to Hadoop.
Modified:
lucene/nutch/trunk/CHANGES.txt
lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java
lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java
lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java
lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java
lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java
lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java
lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
Modified: lucene/nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Tue Jul 17 08:16:40 2007
@@ -89,6 +89,8 @@
30. NUTCH-515 - Next fetch time is set incorrectly. (dogacan)
+30. NUTCH-506 - Nutch should delegate compression to Hadoop. (dogacan)
+
Release 0.9 - 2007-04-02
1. Changed log4j confiquration to log to stdout on commandline
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java Tue Jul 17 08:16:40 2007
@@ -25,6 +25,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
@@ -57,10 +58,12 @@
new Path(new Path(job.getOutputPath(), CrawlDatum.FETCH_DIR_NAME), name);
final Path content =
new Path(new Path(job.getOutputPath(), Content.DIR_NAME), name);
+
+ final CompressionType compType = SequenceFile.getCompressionType(job);
final MapFile.Writer fetchOut =
new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
- CompressionType.NONE, progress);
+ compType, progress);
return new RecordWriter() {
private MapFile.Writer contentOut;
@@ -70,7 +73,7 @@
if (Fetcher.isStoringContent(job)) {
contentOut = new MapFile.Writer(job, fs, content.toString(),
Text.class, Content.class,
- CompressionType.NONE, progress);
+ compType, progress);
}
if (Fetcher.isParsing(job)) {
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java Tue Jul 17 08:16:40 2007
@@ -85,6 +85,7 @@
final float interval = job.getFloat("db.fetch.interval.default", 2592000.0f);
final boolean ignoreExternalLinks = job.getBoolean("db.ignore.external.links", false);
final int maxOutlinks = job.getInt("db.max.outlinks.per.page", 100);
+ final CompressionType compType = SequenceFile.getCompressionType(job);
Path text =
new Path(new Path(job.getOutputPath(), ParseText.DIR_NAME), name);
@@ -99,11 +100,11 @@
final MapFile.Writer dataOut =
new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class,
- CompressionType.RECORD, progress);
+ compType, progress);
final SequenceFile.Writer crawlOut =
SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class,
- CompressionType.NONE, progress);
+ compType, progress);
return new RecordWriter() {
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java Tue Jul 17 08:16:40 2007
@@ -69,7 +69,6 @@
key = newKey;
}
Content content = (Content) value;
- content.forceInflate();
ParseResult parseResult = null;
try {
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java Tue Jul 17 08:16:40 2007
@@ -25,7 +25,8 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.io.VersionedWritable;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.conf.Configuration;
@@ -35,9 +36,9 @@
/**
* @author Andrzej Bialecki <ab@getopt.org>
*/
-public class ParseStatus extends VersionedWritable {
+public class ParseStatus implements Writable {
- private final static byte VERSION = 1;
+ private final static byte VERSION = 2;
// Primary status codes:
@@ -136,17 +137,32 @@
}
public void readFields(DataInput in) throws IOException {
- super.readFields(in); // check version
- majorCode = in.readByte();
- minorCode = in.readShort();
- args = WritableUtils.readCompressedStringArray(in);
- }
+ byte version = in.readByte();
+ switch(version) {
+ case 1:
+ majorCode = in.readByte();
+ minorCode = in.readShort();
+ args = WritableUtils.readCompressedStringArray(in);
+ break;
+ case 2:
+ majorCode = in.readByte();
+ minorCode = in.readShort();
+ args = WritableUtils.readStringArray(in);
+ break;
+ default:
+ throw new VersionMismatchException(VERSION, version);
+ }
+ }
public void write(DataOutput out) throws IOException {
- super.write(out); // write out version
+ out.writeByte(VERSION);
out.writeByte(majorCode);
out.writeShort(minorCode);
- WritableUtils.writeCompressedStringArray(out, args);
+ if (args == null) {
+ out.writeInt(-1);
+ } else {
+ WritableUtils.writeStringArray(out, args);
+ }
}
/** A convenience method. Returns true if majorCode is SUCCESS, false
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java Tue Jul 17 08:16:40 2007
@@ -26,10 +26,10 @@
/* The text conversion of page's content, stored using gzip compression.
* @see Parse#getText()
*/
-public final class ParseText extends VersionedWritable {
+public final class ParseText implements Writable {
public static final String DIR_NAME = "parse_text";
- private final static byte VERSION = 1;
+ private final static byte VERSION = 2;
public ParseText() {}
private String text;
@@ -38,18 +38,23 @@
this.text = text;
}
- public byte getVersion() { return VERSION; }
-
public void readFields(DataInput in) throws IOException {
- super.readFields(in); // check version
- text = WritableUtils.readCompressedString(in);
- return;
+ byte version = in.readByte();
+ switch (version) {
+ case 1:
+ text = WritableUtils.readCompressedString(in);
+ break;
+ case VERSION:
+ text = Text.readString(in);
+ break;
+ default:
+ throw new VersionMismatchException(VERSION, version);
+ }
}
public final void write(DataOutput out) throws IOException {
- super.write(out); // write version
- WritableUtils.writeCompressedString(out, text);
- return;
+ out.write(VERSION);
+ Text.writeString(out, text);
}
public final static ParseText read(DataInput in) throws IOException {
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java Tue Jul 17 08:16:40 2007
@@ -17,32 +17,35 @@
package org.apache.nutch.protocol;
+import java.io.ByteArrayInputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
+import java.util.zip.InflaterInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayFile;
-import org.apache.hadoop.io.CompressedWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.mime.MimeType;
import org.apache.nutch.util.mime.MimeTypeException;
import org.apache.nutch.util.mime.MimeTypes;
-public final class Content extends CompressedWritable {
+public final class Content implements Writable{
public static final String DIR_NAME = "content";
- private final static byte VERSION = 2;
+ private final static int VERSION = -1;
- private byte version;
+ private int version;
private String url;
@@ -58,10 +61,8 @@
private MimeTypes mimeTypes;
- private boolean inflated;
-
public Content() {
- inflated = false;
+ metadata = new Metadata();
}
public Content(String url, String base, byte[] content, String contentType,
@@ -83,21 +84,11 @@
this.mimeTypeMagic = conf.getBoolean("mime.type.magic", true);
this.mimeTypes = MimeTypes.get(conf.get("mime.types.file"));
this.contentType = getContentType(contentType, url, content);
- inflated = true;
}
- public void ensureInflated() {
- if (inflated) {
- return;
- }
- super.ensureInflated();
- inflated = true;
- }
-
- protected final void readFieldsCompressed(DataInput in) throws IOException {
- version = in.readByte();
- metadata = new Metadata();
- switch (version) {
+ private final void readFieldsCompressed(DataInput in) throws IOException {
+ byte oldVersion = in.readByte();
+ switch (oldVersion) {
case 0:
case 1:
url = UTF8.readString(in); // read url
@@ -118,7 +109,7 @@
}
}
break;
- case VERSION:
+ case 2:
url = Text.readString(in); // read url
base = Text.readString(in); // read base
@@ -129,13 +120,41 @@
metadata.readFields(in); // read meta data
break;
default:
- throw new VersionMismatchException(VERSION, version);
+ throw new VersionMismatchException((byte)2, oldVersion);
}
}
+
+ public final void readFields(DataInput in) throws IOException {
+ int sizeOrVersion = in.readInt();
+ if (sizeOrVersion < 0) { // version
+ version = sizeOrVersion;
+ switch (version) {
+ case VERSION:
+ url = Text.readString(in);
+ base = Text.readString(in);
+
+ content = new byte[in.readInt()];
+ in.readFully(content);
+
+ contentType = Text.readString(in);
+ metadata.readFields(in);
+ break;
+ default:
+ throw new VersionMismatchException((byte)VERSION, (byte)version);
+ }
+ } else { // size
+ byte[] compressed = new byte[sizeOrVersion];
+ in.readFully(compressed, 0, compressed.length);
+ ByteArrayInputStream deflated = new ByteArrayInputStream(compressed);
+ DataInput inflater =
+ new DataInputStream(new InflaterInputStream(deflated));
+ readFieldsCompressed(inflater);
+ }
+ }
- protected final void writeCompressed(DataOutput out) throws IOException {
- out.writeByte(VERSION);
+ public final void write(DataOutput out) throws IOException {
+ out.writeInt(VERSION);
Text.writeString(out, url); // write url
Text.writeString(out, base); // write base
@@ -160,7 +179,6 @@
/** The url fetched. */
public String getUrl() {
- ensureInflated();
return url;
}
@@ -168,18 +186,15 @@
* Maybe be different from url if the request redirected.
*/
public String getBaseUrl() {
- ensureInflated();
return base;
}
/** The binary content retrieved. */
public byte[] getContent() {
- ensureInflated();
return content;
}
public void setContent(byte[] content) {
- ensureInflated();
this.content = content;
}
@@ -188,34 +203,28 @@
* http://www.iana.org/assignments/media-types/</a>
*/
public String getContentType() {
- ensureInflated();
return contentType;
}
public void setContentType(String contentType) {
- ensureInflated();
this.contentType = contentType;
}
/** Other protocol-specific data. */
public Metadata getMetadata() {
- ensureInflated();
return metadata;
}
/** Other protocol-specific data. */
public void setMetadata(Metadata metadata) {
- ensureInflated();
this.metadata = metadata;
}
public boolean equals(Object o) {
- ensureInflated();
if (!(o instanceof Content)) {
return false;
}
Content that = (Content) o;
- that.ensureInflated();
return this.url.equals(that.url) && this.base.equals(that.base)
&& Arrays.equals(this.getContent(), that.getContent())
&& this.contentType.equals(that.contentType)
@@ -223,7 +232,6 @@
}
public String toString() {
- ensureInflated();
StringBuffer buffer = new StringBuffer();
buffer.append("Version: " + version + "\n");
@@ -296,13 +304,4 @@
}
return typeName;
}
-
- /**
- * By calling this method caller forces the next access to any property (via
- * getters and setters) to check if decompressing of data is really required.
- */
- public void forceInflate() {
- inflated = false;
- }
-
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java Tue Jul 17 08:16:40 2007
@@ -22,15 +22,16 @@
import java.io.IOException;
import java.util.HashMap;
-import org.apache.hadoop.io.VersionedWritable;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
* @author Andrzej Bialecki
*/
-public class ProtocolStatus extends VersionedWritable {
+public class ProtocolStatus implements Writable {
- private final static byte VERSION = 1;
+ private final static byte VERSION = 2;
/** Content was retrieved without errors. */
public static final int SUCCESS = 1;
@@ -110,10 +111,6 @@
}
- public byte getVersion() {
- return VERSION;
- }
-
public ProtocolStatus(int code, String[] args) {
this.code = code;
this.args = args;
@@ -154,17 +151,32 @@
}
public void readFields(DataInput in) throws IOException {
- super.readFields(in); // check version
- code = in.readByte();
- lastModified = in.readLong();
- args = WritableUtils.readCompressedStringArray(in);
+ byte version = in.readByte();
+ switch(version) {
+ case 1:
+ code = in.readByte();
+ lastModified = in.readLong();
+ args = WritableUtils.readCompressedStringArray(in);
+ break;
+ case VERSION:
+ code = in.readByte();
+ lastModified = in.readLong();
+ args = WritableUtils.readStringArray(in);
+ break;
+ default:
+ throw new VersionMismatchException(VERSION, version);
+ }
}
public void write(DataOutput out) throws IOException {
- super.write(out); // write version
+ out.writeByte(VERSION);
out.writeByte((byte)code);
out.writeLong(lastModified);
- WritableUtils.writeCompressedStringArray(out, args);
+ if (args == null) {
+ out.writeInt(-1);
+ } else {
+ WritableUtils.writeStringArray(out, args);
+ }
}
public void setArgs(String[] args) {
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Tue Jul 17 08:16:40 2007
@@ -238,13 +238,15 @@
} else {
wname = new Path(new Path(new Path(job.getOutputPath(), segmentName + "-" + slice), dirName), name);
}
- res = new SequenceFile.Writer(fs, job, wname, Text.class, CrawlDatum.class, progress, new SequenceFile.Metadata());
+ res = SequenceFile.createWriter(fs, job, wname, Text.class,
+ CrawlDatum.class,
+ SequenceFile.getCompressionType(job), progress);
sliceWriters.put(slice + dirName, res);
return res;
}
// lazily create MapFile-s.
- private MapFile.Writer ensureMapFile(String slice, String dirName, Class clazz) throws IOException {
+ private MapFile.Writer ensureMapFile(String slice, String dirName, Class<? extends Writable> clazz) throws IOException {
if (slice == null) slice = DEFAULT_SLICE;
MapFile.Writer res = (MapFile.Writer)sliceWriters.get(slice + dirName);
if (res != null) return res;
@@ -254,7 +256,11 @@
} else {
wname = new Path(new Path(new Path(job.getOutputPath(), segmentName + "-" + slice), dirName), name);
}
- res = new MapFile.Writer(job, fs, wname.toString(), Text.class, clazz, CompressionType.RECORD, progress);
+ CompressionType compType = SequenceFile.getCompressionType(job);
+ if (clazz.isAssignableFrom(ParseText.class)) {
+ compType = CompressionType.RECORD;
+ }
+ res = new MapFile.Writer(job, fs, wname.toString(), Text.class, clazz, compType, progress);
sliceWriters.put(slice + dirName, res);
return res;
}