You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/08/10 18:36:48 UTC

svn commit: r231281 - in /lucene/nutch/branches/mapred/src: java/org/apache/nutch/io/DataInputBuffer.java java/org/apache/nutch/io/SequenceFile.java test/org/apache/nutch/io/TestSequenceFile.java

Author: cutting
Date: Wed Aug 10 09:36:41 2005
New Revision: 231281

URL: http://svn.apache.org/viewcvs?rev=231281&view=rev
Log:
Add value compression option to SequenceFile.

Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/DataInputBuffer.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
    lucene/nutch/branches/mapred/src/test/org/apache/nutch/io/TestSequenceFile.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/DataInputBuffer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/DataInputBuffer.java?rev=231281&r1=231280&r2=231281&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/DataInputBuffer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/DataInputBuffer.java Wed Aug 10 09:36:41 2005
@@ -53,6 +53,7 @@
     }
 
     public int getPosition() { return pos; }
+    public int getLength() { return count; }
   }
 
   private Buffer buffer;
@@ -79,5 +80,8 @@
 
   /** Returns the current position in the input. */
   public int getPosition() { return buffer.getPosition(); }
+
+  /** Returns the length of the input. */
+  public int getLength() { return buffer.getLength(); }
 
 }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=231281&r1=231280&r2=231281&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Wed Aug 10 09:36:41 2005
@@ -18,6 +18,7 @@
 
 import java.io.*;
 import java.util.*;
+import java.util.zip.*;
 import java.util.logging.*;
 import java.nio.channels.*;
 import java.net.InetAddress;
@@ -35,7 +36,7 @@
   private SequenceFile() {}                         // no public ctor
 
   private static byte[] VERSION = new byte[] {
-    (byte)'S', (byte)'E', (byte)'Q', 2
+    (byte)'S', (byte)'E', (byte)'Q', 3
   };
 
   private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
@@ -55,6 +56,11 @@
     private Class keyClass;
     private Class valClass;
 
+    private boolean deflateValues;
+    private DataOutputBuffer deflateIn = new DataOutputBuffer();
+    private byte[] deflateOut = new byte[1024];
+    private Deflater deflater = new Deflater();
+
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
     // starts and ends by scanning for this value.
@@ -74,30 +80,45 @@
     public Writer(NutchFileSystem nfs, String name,
                   Class keyClass, Class valClass)
       throws IOException {
+      this(nfs, name, keyClass, valClass, false);
+    }
+    
+    /** Create the named file.
+     * @param compress if true, values are compressed.
+     */
+    public Writer(NutchFileSystem nfs, String name,
+                  Class keyClass, Class valClass, boolean compress)
+      throws IOException {
       this.nfs = nfs;
       this.target = new File(name);
       init(new NFSDataOutputStream(nfs.create(target)),
-           keyClass, valClass);
+           keyClass, valClass, compress);
     }
     
     /** Write to an arbitrary stream using a specified buffer size. */
     private Writer(NFSDataOutputStream out,
-                   Class keyClass, Class valClass) throws IOException {
-      init(out, keyClass, valClass);
+                   Class keyClass, Class valClass, boolean compress)
+      throws IOException {
+      init(out, keyClass, valClass, compress);
     }
     
     /** Write and flush the file header. */
     private void init(NFSDataOutputStream out,
-                      Class keyClass, Class valClass) throws IOException {
+                      Class keyClass, Class valClass,
+                      boolean compress) throws IOException {
       this.out = out;
       this.out.write(VERSION);
 
       this.keyClass = keyClass;
       this.valClass = valClass;
 
+      this.deflateValues = compress;
+
       new UTF8(WritableName.getName(keyClass)).write(this.out);
       new UTF8(WritableName.getName(valClass)).write(this.out);
 
+      this.out.writeBoolean(deflateValues);
+
       out.write(sync);                            // write the sync bytes
 
       this.out.flush();                           // flush header
@@ -134,7 +155,20 @@
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed: " + key);
 
-      val.write(buffer);
+      if (deflateValues) {
+        deflateIn.reset();
+        val.write(deflateIn);
+        deflater.reset();
+        deflater.setInput(deflateIn.getData(), 0, deflateIn.getLength());
+        deflater.finish();
+        while (!deflater.finished()) {
+          int count = deflater.deflate(deflateOut);
+          buffer.write(deflateOut, 0, count);
+        }
+      } else {
+        val.write(buffer);
+      }
+
       append(buffer.getData(), 0, buffer.getLength(), keyLength);
     }
 
@@ -185,6 +219,11 @@
     private long end;
     private int keyLength;
 
+    private boolean inflateValues;
+    private byte[] inflateIn = new byte[1024];
+    private DataOutputBuffer inflateOut = new DataOutputBuffer();
+    private Inflater inflater = new Inflater();
+
     /** Open the named file. */
     public Reader(NutchFileSystem nfs, String file) throws IOException {
       this(nfs, file, NutchConf.get().getInt("io.file.buffer.size", 4096));
@@ -229,6 +268,10 @@
       className.readFields(in);                   // read val class name
       this.valClass = WritableName.getClass(className.toString());
 
+      if (version[3] > 2) {                       // if version > 2
+        this.inflateValues = in.readBoolean();    // is compressed?
+      }
+
       if (version[3] > 1) {                       // if version > 1
         in.readFully(sync);                       // read sync bytes
       }
@@ -245,6 +288,9 @@
     /** Returns the class of values in this file. */
     public Class getValueClass() { return valClass; }
 
+    /** Returns true if values are compressed. */
+    public boolean isCompressed() { return inflateValues; }
+
     /** Read the next key in the file into <code>key</code>, skipping its
      * value.  True if another entry exists, and false at end of file. */
     public synchronized boolean next(Writable key) throws IOException {
@@ -278,11 +324,29 @@
       boolean more = next(key);
 
       if (more) {
+
+        if (inflateValues) {
+          inflater.reset();
+          inflater.setInput(outBuf.getData(), keyLength,
+                            outBuf.getLength()-keyLength);
+          inflateOut.reset();
+          while (!inflater.finished()) {
+            try {
+              int count = inflater.inflate(inflateIn);
+              inflateOut.write(inflateIn, 0, count);
+            } catch (DataFormatException e) {
+              throw new IOException (e.toString());
+            }
+          }
+          inBuf.reset(inflateOut.getData(), inflateOut.getLength());
+        }
+
         val.readFields(inBuf);
-        if (inBuf.getPosition() != outBuf.getLength())
+
+        if (inBuf.getPosition() != inBuf.getLength())
           throw new IOException(val+" read "+(inBuf.getPosition()-keyLength)
                                 + " bytes, should read " +
-                                (outBuf.getLength()-keyLength));
+                                (inBuf.getLength()-keyLength));
       }
 
       return more;
@@ -533,7 +597,7 @@
           out.writeLong(count);                   // write count
         }
 
-        Writer writer = new Writer(out, keyClass, valClass);
+        Writer writer = new Writer(out, keyClass, valClass, in.isCompressed());
         if (!done) {
           writer.sync = null;                     // disable sync on temp files
         }
@@ -651,7 +715,7 @@
 
             MergeStream ms = new MergeStream(reader); // add segment to queue
             if (ms.next()) {
-              queue.put(ms);
+              queue.add(ms);
             }
             in.seek(reader.end);
           }
@@ -740,6 +804,16 @@
     private class MergeQueue extends PriorityQueue {
       private NFSDataOutputStream out;
       private boolean done;
+      private boolean compress;
+
+      public void add(MergeStream stream) throws IOException {
+        if (size() == 0) {
+          compress = stream.in.isCompressed();
+        } else if (compress != stream.in.isCompressed()) {
+          throw new IOException("All merged files must be compressed or not.");
+        }
+        put(stream);
+      }
 
       public MergeQueue(int size, String outName, boolean done)
         throws IOException {
@@ -758,7 +832,7 @@
       }
 
       public void merge() throws IOException {
-        Writer writer = new Writer(out, keyClass, valClass);
+        Writer writer = new Writer(out, keyClass, valClass, compress);
         if (!done) {
           writer.sync = null;                     // disable sync on temp files
         }

Modified: lucene/nutch/branches/mapred/src/test/org/apache/nutch/io/TestSequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/test/org/apache/nutch/io/TestSequenceFile.java?rev=231281&r1=231280&r2=231281&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/test/org/apache/nutch/io/TestSequenceFile.java (original)
+++ lucene/nutch/branches/mapred/src/test/org/apache/nutch/io/TestSequenceFile.java Wed Aug 10 09:36:41 2005
@@ -42,7 +42,7 @@
     NutchFileSystem nfs = new LocalFileSystem();
     try {
         //LOG.setLevel(Level.FINE);
-        writeTest(nfs, count, seed, file);
+        writeTest(nfs, count, seed, file, false);
         readTest(nfs, count, seed, file);
 
         sortTest(nfs, count, megabytes, factor, false, file);
@@ -61,12 +61,14 @@
     }
   }
 
-  private static void writeTest(NutchFileSystem nfs, int count, int seed, String file)
+  private static void writeTest(NutchFileSystem nfs, int count, int seed,
+                                String file, boolean compress)
     throws IOException {
     new File(file).delete();
     LOG.fine("creating with " + count + " records");
     SequenceFile.Writer writer =
-      new SequenceFile.Writer(nfs, file, RandomDatum.class, RandomDatum.class);
+      new SequenceFile.Writer(nfs, file, RandomDatum.class, RandomDatum.class,
+                              compress);
     RandomDatum.Generator generator = new RandomDatum.Generator(seed);
     for (int i = 0; i < count; i++) {
       generator.next();
@@ -210,8 +212,9 @@
     boolean check = false;
     boolean fast = false;
     boolean merge = false;
+    boolean compress = false;
     String file = null;
-    String usage = "Usage: SequenceFile (-local | -ndfs <namenode:port>) [-count N] [-megabytes M] [-factor F] [-nocreate] [-check] [-fast] [-merge] file";
+    String usage = "Usage: SequenceFile (-local | -ndfs <namenode:port>) [-count N] [-megabytes M] [-factor F] [-nocreate] [-check] [-fast] [-merge] [-compress] file";
     
     if (args.length == 0) {
         System.err.println(usage);
@@ -237,6 +240,8 @@
               fast = true;
           } else if (args[i].equals("-merge")) {
               merge = true;
+          } else if (args[i].equals("-compress")) {
+              compress = true;
           } else {
               // file is required parameter
               file = args[i];
@@ -249,6 +254,7 @@
         LOG.info("check = " + check);
         LOG.info("fast = " + fast);
         LOG.info("merge = " + merge);
+        LOG.info("compress = " + compress);
         LOG.info("file = " + file);
 
         int seed = 0;
@@ -256,7 +262,7 @@
         LOG.setLevel(Level.FINE);
 
         if (create && !merge) {
-            writeTest(nfs, count, seed, file);
+            writeTest(nfs, count, seed, file, compress);
             readTest(nfs, count, seed, file);
         }