You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by do...@apache.org on 2007/07/17 17:16:42 UTC

svn commit: r556946 - in /lucene/nutch/trunk: ./ src/java/org/apache/nutch/fetcher/ src/java/org/apache/nutch/parse/ src/java/org/apache/nutch/protocol/ src/java/org/apache/nutch/segment/

Author: dogacan
Date: Tue Jul 17 08:16:40 2007
New Revision: 556946

URL: http://svn.apache.org/viewvc?view=rev&rev=556946
Log:
NUTCH-506 - Delegate compression to Hadoop.

Modified:
    lucene/nutch/trunk/CHANGES.txt
    lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
    lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java
    lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java
    lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java
    lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java
    lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java
    lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java
    lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java

Modified: lucene/nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Tue Jul 17 08:16:40 2007
@@ -89,6 +89,8 @@
 
 30. NUTCH-515 - Next fetch time is set incorrectly. (dogacan)
 
+30. NUTCH-506 - Nutch should delegate compression to Hadoop. (dogacan)
+
 Release 0.9 - 2007-04-02
 
  1. Changed log4j confiquration to log to stdout on commandline

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java Tue Jul 17 08:16:40 2007
@@ -25,6 +25,7 @@
 import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
@@ -57,10 +58,12 @@
       new Path(new Path(job.getOutputPath(), CrawlDatum.FETCH_DIR_NAME), name);
     final Path content =
       new Path(new Path(job.getOutputPath(), Content.DIR_NAME), name);
+    
+    final CompressionType compType = SequenceFile.getCompressionType(job);
 
     final MapFile.Writer fetchOut =
       new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
-          CompressionType.NONE, progress);
+          compType, progress);
     
     return new RecordWriter() {
         private MapFile.Writer contentOut;
@@ -70,7 +73,7 @@
           if (Fetcher.isStoringContent(job)) {
             contentOut = new MapFile.Writer(job, fs, content.toString(),
                                             Text.class, Content.class,
-                                            CompressionType.NONE, progress);
+                                            compType, progress);
           }
 
           if (Fetcher.isParsing(job)) {

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseOutputFormat.java Tue Jul 17 08:16:40 2007
@@ -85,6 +85,7 @@
     final float interval = job.getFloat("db.fetch.interval.default", 2592000.0f);
     final boolean ignoreExternalLinks = job.getBoolean("db.ignore.external.links", false);
     final int maxOutlinks = job.getInt("db.max.outlinks.per.page", 100);
+    final CompressionType compType = SequenceFile.getCompressionType(job);
     
     Path text =
       new Path(new Path(job.getOutputPath(), ParseText.DIR_NAME), name);
@@ -99,11 +100,11 @@
     
     final MapFile.Writer dataOut =
       new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class,
-          CompressionType.RECORD, progress);
+          compType, progress);
     
     final SequenceFile.Writer crawlOut =
       SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class,
-          CompressionType.NONE, progress);
+          compType, progress);
     
     return new RecordWriter() {
 

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseSegment.java Tue Jul 17 08:16:40 2007
@@ -69,7 +69,6 @@
       key = newKey;
     }
     Content content = (Content) value;
-    content.forceInflate();
 
     ParseResult parseResult = null;
     try {

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatus.java Tue Jul 17 08:16:40 2007
@@ -25,7 +25,8 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.io.VersionedWritable;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.conf.Configuration;
 
@@ -35,9 +36,9 @@
 /**
  * @author Andrzej Bialecki <ab@getopt.org>
  */
-public class ParseStatus extends VersionedWritable {
+public class ParseStatus implements Writable {
   
-  private final static byte VERSION = 1;
+  private final static byte VERSION = 2;
   
   // Primary status codes:
   
@@ -136,17 +137,32 @@
   }
   
   public void readFields(DataInput in) throws IOException {
-    super.readFields(in);     // check version
-    majorCode = in.readByte();
-    minorCode = in.readShort();
-    args = WritableUtils.readCompressedStringArray(in);
-  }
+    byte version = in.readByte();
+    switch(version) {
+    case 1:
+      majorCode = in.readByte();
+      minorCode = in.readShort();
+      args = WritableUtils.readCompressedStringArray(in);
+      break;
+    case 2:
+      majorCode = in.readByte();
+      minorCode = in.readShort();
+      args = WritableUtils.readStringArray(in);
+      break;
+    default:
+      throw new VersionMismatchException(VERSION, version);
+    }
+ }
   
   public void write(DataOutput out) throws IOException {
-    super.write(out);         // write out version
+    out.writeByte(VERSION);
     out.writeByte(majorCode);
     out.writeShort(minorCode);
-    WritableUtils.writeCompressedStringArray(out, args);
+    if (args == null) {
+      out.writeInt(-1);
+    } else {
+      WritableUtils.writeStringArray(out, args);
+    }
   }
   
   /** A convenience method. Returns true if majorCode is SUCCESS, false

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/parse/ParseText.java Tue Jul 17 08:16:40 2007
@@ -26,10 +26,10 @@
 /* The text conversion of page's content, stored using gzip compression.
  * @see Parse#getText()
  */
-public final class ParseText extends VersionedWritable {
+public final class ParseText implements Writable {
   public static final String DIR_NAME = "parse_text";
 
-  private final static byte VERSION = 1;
+  private final static byte VERSION = 2;
 
   public ParseText() {}
   private String text;
@@ -38,18 +38,23 @@
     this.text = text;
   }
 
-  public byte getVersion() { return VERSION; }
-
   public void readFields(DataInput in) throws IOException {
-    super.readFields(in);                         // check version
-    text = WritableUtils.readCompressedString(in);
-    return;
+    byte version = in.readByte();
+    switch (version) {
+    case 1:
+      text = WritableUtils.readCompressedString(in);
+      break;
+    case VERSION:
+      text = Text.readString(in);
+      break;
+    default:
+      throw new VersionMismatchException(VERSION, version);
+    }
   }
 
   public final void write(DataOutput out) throws IOException {
-    super.write(out);                             // write version
-    WritableUtils.writeCompressedString(out, text);
-    return;
+    out.write(VERSION);
+    Text.writeString(out, text);
   }
 
   public final static ParseText read(DataInput in) throws IOException {

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/protocol/Content.java Tue Jul 17 08:16:40 2007
@@ -17,32 +17,35 @@
 
 package org.apache.nutch.protocol;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.zip.InflaterInputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.ArrayFile;
-import org.apache.hadoop.io.CompressedWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
 import org.apache.nutch.metadata.Metadata;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.mime.MimeType;
 import org.apache.nutch.util.mime.MimeTypeException;
 import org.apache.nutch.util.mime.MimeTypes;
 
-public final class Content extends CompressedWritable {
+public final class Content implements Writable{
 
   public static final String DIR_NAME = "content";
 
-  private final static byte VERSION = 2;
+  private final static int VERSION = -1;
 
-  private byte version;
+  private int version;
 
   private String url;
 
@@ -58,10 +61,8 @@
 
   private MimeTypes mimeTypes;
 
-  private boolean inflated;
-
   public Content() {
-    inflated = false;
+    metadata = new Metadata();
   }
 
   public Content(String url, String base, byte[] content, String contentType,
@@ -83,21 +84,11 @@
     this.mimeTypeMagic = conf.getBoolean("mime.type.magic", true);
     this.mimeTypes = MimeTypes.get(conf.get("mime.types.file"));
     this.contentType = getContentType(contentType, url, content);
-    inflated = true;
   }
 
-  public void ensureInflated() {
-    if (inflated) {
-      return;
-    }
-    super.ensureInflated();
-    inflated = true;
-  }
-
-  protected final void readFieldsCompressed(DataInput in) throws IOException {
-    version = in.readByte();
-    metadata = new Metadata();
-    switch (version) {
+  private final void readFieldsCompressed(DataInput in) throws IOException {
+    byte oldVersion = in.readByte();
+    switch (oldVersion) {
     case 0:
     case 1:
       url = UTF8.readString(in); // read url
@@ -118,7 +109,7 @@
         }
       }
       break;
-    case VERSION:
+    case 2:
       url = Text.readString(in); // read url
       base = Text.readString(in); // read base
 
@@ -129,13 +120,41 @@
       metadata.readFields(in); // read meta data
       break;
     default:
-      throw new VersionMismatchException(VERSION, version);
+      throw new VersionMismatchException((byte)2, oldVersion);
     }
 
   }
+  
+  public final void readFields(DataInput in) throws IOException {
+    int sizeOrVersion = in.readInt();
+    if (sizeOrVersion < 0) { // version
+      version = sizeOrVersion;
+      switch (version) {
+      case VERSION:
+        url = Text.readString(in);
+        base = Text.readString(in);
+
+        content = new byte[in.readInt()];
+        in.readFully(content);
+
+        contentType = Text.readString(in);
+        metadata.readFields(in);
+        break;
+      default:
+        throw new VersionMismatchException((byte)VERSION, (byte)version);
+      }
+    } else { // size
+      byte[] compressed = new byte[sizeOrVersion];
+      in.readFully(compressed, 0, compressed.length);
+      ByteArrayInputStream deflated = new ByteArrayInputStream(compressed);
+      DataInput inflater =
+        new DataInputStream(new InflaterInputStream(deflated));
+      readFieldsCompressed(inflater);
+    }
+  }
 
-  protected final void writeCompressed(DataOutput out) throws IOException {
-    out.writeByte(VERSION);
+  public final void write(DataOutput out) throws IOException {
+    out.writeInt(VERSION);
 
     Text.writeString(out, url); // write url
     Text.writeString(out, base); // write base
@@ -160,7 +179,6 @@
 
   /** The url fetched. */
   public String getUrl() {
-    ensureInflated();
     return url;
   }
 
@@ -168,18 +186,15 @@
    * Maybe be different from url if the request redirected.
    */
   public String getBaseUrl() {
-    ensureInflated();
     return base;
   }
 
   /** The binary content retrieved. */
   public byte[] getContent() {
-    ensureInflated();
     return content;
   }
 
   public void setContent(byte[] content) {
-    ensureInflated();
     this.content = content;
   }
 
@@ -188,34 +203,28 @@
    *      http://www.iana.org/assignments/media-types/</a>
    */
   public String getContentType() {
-    ensureInflated();
     return contentType;
   }
 
   public void setContentType(String contentType) {
-    ensureInflated();
     this.contentType = contentType;
   }
 
   /** Other protocol-specific data. */
   public Metadata getMetadata() {
-    ensureInflated();
     return metadata;
   }
 
   /** Other protocol-specific data. */
   public void setMetadata(Metadata metadata) {
-    ensureInflated();
     this.metadata = metadata;
   }
 
   public boolean equals(Object o) {
-    ensureInflated();
     if (!(o instanceof Content)) {
       return false;
     }
     Content that = (Content) o;
-    that.ensureInflated();
     return this.url.equals(that.url) && this.base.equals(that.base)
         && Arrays.equals(this.getContent(), that.getContent())
         && this.contentType.equals(that.contentType)
@@ -223,7 +232,6 @@
   }
 
   public String toString() {
-    ensureInflated();
     StringBuffer buffer = new StringBuffer();
 
     buffer.append("Version: " + version + "\n");
@@ -296,13 +304,4 @@
     }
     return typeName;
   }
-
-  /**
-   * By calling this method caller forces the next access to any property (via
-   * getters and setters) to check if decompressing of data is really required.
-   */
-  public void forceInflate() {
-    inflated = false;
-  }
-
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/protocol/ProtocolStatus.java Tue Jul 17 08:16:40 2007
@@ -22,15 +22,16 @@
 import java.io.IOException;
 import java.util.HashMap;
 
-import org.apache.hadoop.io.VersionedWritable;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 
 /**
  * @author Andrzej Bialecki
  */
-public class ProtocolStatus extends VersionedWritable {
+public class ProtocolStatus implements Writable {
   
-  private final static byte VERSION = 1;
+  private final static byte VERSION = 2;
   
   /** Content was retrieved without errors. */
   public static final int SUCCESS              = 1;
@@ -110,10 +111,6 @@
     
   }
 
-  public byte getVersion() {
-    return VERSION;
-  }
-
   public ProtocolStatus(int code, String[] args) {
     this.code = code;
     this.args = args;
@@ -154,17 +151,32 @@
   }
   
   public void readFields(DataInput in) throws IOException {
-    super.readFields(in);       // check version
-    code = in.readByte();
-    lastModified = in.readLong();
-    args = WritableUtils.readCompressedStringArray(in);
+    byte version = in.readByte();
+    switch(version) {
+    case 1:
+      code = in.readByte();
+      lastModified = in.readLong();
+      args = WritableUtils.readCompressedStringArray(in);
+      break;
+    case VERSION:
+      code = in.readByte();
+      lastModified = in.readLong();
+      args = WritableUtils.readStringArray(in);
+      break;
+    default:
+      throw new VersionMismatchException(VERSION, version);
+    }
   }
   
   public void write(DataOutput out) throws IOException {
-    super.write(out);           // write version
+    out.writeByte(VERSION);
     out.writeByte((byte)code);
     out.writeLong(lastModified);
-    WritableUtils.writeCompressedStringArray(out, args);
+    if (args == null) {
+      out.writeInt(-1);
+    } else {
+      WritableUtils.writeStringArray(out, args);
+    }
   }
 
   public void setArgs(String[] args) {

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?view=diff&rev=556946&r1=556945&r2=556946
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Tue Jul 17 08:16:40 2007
@@ -238,13 +238,15 @@
           } else {
             wname = new Path(new Path(new Path(job.getOutputPath(), segmentName + "-" + slice), dirName), name);
           }
-          res = new SequenceFile.Writer(fs, job, wname, Text.class, CrawlDatum.class, progress, new SequenceFile.Metadata());
+          res = SequenceFile.createWriter(fs, job, wname, Text.class, 
+                                          CrawlDatum.class, 
+                                          SequenceFile.getCompressionType(job), progress);
           sliceWriters.put(slice + dirName, res);
           return res;
         }
 
         // lazily create MapFile-s.
-        private MapFile.Writer ensureMapFile(String slice, String dirName, Class clazz) throws IOException {
+        private MapFile.Writer ensureMapFile(String slice, String dirName, Class<? extends Writable> clazz) throws IOException {
           if (slice == null) slice = DEFAULT_SLICE;
           MapFile.Writer res = (MapFile.Writer)sliceWriters.get(slice + dirName);
           if (res != null) return res;
@@ -254,7 +256,11 @@
           } else {
             wname = new Path(new Path(new Path(job.getOutputPath(), segmentName + "-" + slice), dirName), name);
           }
-          res = new MapFile.Writer(job, fs, wname.toString(), Text.class, clazz, CompressionType.RECORD, progress);
+          CompressionType compType = SequenceFile.getCompressionType(job);
+          if (clazz.isAssignableFrom(ParseText.class)) {
+            compType = CompressionType.RECORD;
+          }
+          res = new MapFile.Writer(job, fs, wname.toString(), Text.class, clazz, compType, progress);
           sliceWriters.put(slice + dirName, res);
           return res;
         }