You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/03 23:08:16 UTC

svn commit: r452624 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/compress/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/io/

Author: cutting
Date: Tue Oct  3 14:08:15 2006
New Revision: 452624

URL: http://svn.apache.org/viewvc?view=rev&rev=452624
Log:
HADOOP-522.  Permit block compression with MapFile and SetFile.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SetFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct  3 14:08:15 2006
@@ -112,6 +112,10 @@
     stream. Implement an optimized version for DFS and local FS.
     (Milind Bhandarkar via cutting)
 
+28. HADOOP-522. Permit block compression with MapFile and SetFile.
+    Since these formats are always sorted, block compression can
+    provide a big advantage.  (cutting)
+
 
 Release 0.6.2 - 2006-09-18
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java Tue Oct  3 14:08:15 2006
@@ -19,6 +19,7 @@
 import java.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 
 /** A file-based map from keys to values.
  * 
@@ -69,12 +70,20 @@
       this(fs, dirName, WritableComparator.get(keyClass), valClass, false);
     }
 
-    /** Create the named map for keys of the named class. */
+    /** Create the named map for keys of the named class.
+     * @deprecated specify a {@link CompressionType} instead
+     */
     public Writer(FileSystem fs, String dirName,
                   Class keyClass, Class valClass, boolean compress)
       throws IOException {
       this(fs, dirName, WritableComparator.get(keyClass), valClass, compress);
     }
+    /** Create the named map for keys of the named class. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+                  Class keyClass, Class valClass, CompressionType compress)
+      throws IOException {
+      this(conf,fs,dirName,WritableComparator.get(keyClass),valClass,compress);
+    }
 
     /** Create the named map using the named key comparator. */
     public Writer(FileSystem fs, String dirName,
@@ -82,12 +91,24 @@
       throws IOException {
       this(fs, dirName, comparator, valClass, false);
     }
-    /** Create the named map using the named key comparator. */
+    /** Create the named map using the named key comparator.
+     * @deprecated specify a {@link CompressionType} instead
+     */
     public Writer(FileSystem fs, String dirName,
                   WritableComparator comparator, Class valClass,
                   boolean compress)
       throws IOException {
 
+      this(new Configuration(), fs, dirName, comparator, valClass,
+           compress ? CompressionType.RECORD : CompressionType.NONE);
+    }
+
+    /** Create the named map using the named key comparator. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+                  WritableComparator comparator, Class valClass,
+                  SequenceFile.CompressionType compress)
+      throws IOException {
+
       this.comparator = comparator;
       this.lastKey = comparator.newKey();
 
@@ -99,9 +120,11 @@
 
       Class keyClass = comparator.getKeyClass();
       this.data =
-        new SequenceFile.Writer(fs, dataFile, keyClass, valClass, compress);
+        SequenceFile.createWriter
+        (fs,conf,dataFile,keyClass,valClass,compress);
       this.index =
-        new SequenceFile.Writer(fs, indexFile, keyClass, LongWritable.class);
+        SequenceFile.createWriter
+        (fs,conf,indexFile,keyClass,LongWritable.class,CompressionType.BLOCK);
     }
     
     /** The number of entries that are added before an index entry is added.*/
@@ -159,9 +182,7 @@
       
     private WritableComparator comparator;
 
-    private DataOutputBuffer keyBuf = new DataOutputBuffer();
-    private DataOutputBuffer nextBuf = new DataOutputBuffer();
-    private int nextKeyLen = -1;
+    private WritableComparable nextKey;
     private long seekPosition = -1;
     private int seekIndex = -1;
     private long firstPosition;
@@ -300,14 +321,11 @@
     public synchronized boolean seek(WritableComparable key)
       throws IOException {
       readIndex();                                // make sure index is read
-      keyBuf.reset();                             // write key to keyBuf
-      key.write(keyBuf);
 
       if (seekIndex != -1                         // seeked before
           && seekIndex+1 < count           
           && comparator.compare(key,keys[seekIndex+1])<0 // before next indexed
-          && comparator.compare(keyBuf.getData(), 0, keyBuf.getLength(),
-                                nextBuf.getData(), 0, nextKeyLen)
+          && comparator.compare(key, nextKey)
           >= 0) {                                 // but after last seeked
         // do nothing
       } else {
@@ -322,14 +340,14 @@
       }
       data.seek(seekPosition);
       
-      while ((nextKeyLen = data.next(nextBuf.reset())) != -1) {
-        int c = comparator.compare(keyBuf.getData(), 0, keyBuf.getLength(),
-                                   nextBuf.getData(), 0, nextKeyLen);
+      if (nextKey == null)
+        nextKey = comparator.newKey();
+
+      while (data.next(nextKey)) {
+        int c = comparator.compare(key, nextKey);
         if (c <= 0) {                             // at or beyond desired
-          data.seek(seekPosition);                // back off to previous
           return c == 0;
         }
-        seekPosition = data.getPosition();
       }
 
       return false;
@@ -366,7 +384,7 @@
     public synchronized Writable get(WritableComparable key, Writable val)
       throws IOException {
       if (seek(key)) {
-        next(getKey, val);                        // don't smash key
+        data.getCurrentValue(val);
         return val;
       } else
         return null;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Tue Oct  3 14:08:15 2006
@@ -592,7 +592,15 @@
       val.writeUncompressedBytes(out);            // value
     }
 
-    /** Returns the current length of the output file. */
+    /** Returns the current length of the output file.
+     *
+     * <p>This always returns a synchronized position.  In other words, {@link
+     * immediately after calling {@link Reader#seek(long)} with a position
+     * returned by this method, Reader#next(Writable) may be called.  However
+     * the key may be earlier in the file than key last written when this
+     * method was called (e.g., with block-compression, it may be the first key
+     * in the block that was being written when this method was called).
+     */
     public synchronized long getLength() throws IOException {
       return out.getPos();
     }
@@ -1023,13 +1031,13 @@
         valLenBuffer = new DataInputBuffer();
         
         keyLenInFilter = this.codec.createInputStream(keyLenBuffer);
-        keyLenIn = new DataInputStream(new BufferedInputStream(keyLenInFilter));
+        keyLenIn = new DataInputStream(keyLenInFilter);
 
         keyInFilter = this.codec.createInputStream(keyBuffer);
-        keyIn = new DataInputStream(new BufferedInputStream(keyInFilter));
+        keyIn = new DataInputStream(keyInFilter);
 
         valLenInFilter = this.codec.createInputStream(valLenBuffer);
-        valLenIn = new DataInputStream(new BufferedInputStream(valLenInFilter));
+        valLenIn = new DataInputStream(valLenInFilter);
       }
       
 
@@ -1058,19 +1066,17 @@
 
     /** Read a compressed buffer */
     private synchronized void readBuffer(DataInputBuffer buffer, 
-        CompressionInputStream filter, boolean castAway) throws IOException {
+        CompressionInputStream filter) throws IOException {
       // Read data into a temporary buffer
       DataOutputBuffer dataBuffer = new DataOutputBuffer();
       int dataBufferLength = WritableUtils.readVInt(in);
       dataBuffer.write(in, dataBufferLength);
       
-      if (false == castAway) {
-        // Reset the codec
-        filter.resetState();
-        
-        // Set up 'buffer' connected to the input-stream
-        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
-      }
+      // Set up 'buffer' connected to the input-stream
+      buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
+
+      // Reset the codec
+      filter.resetState();
     }
     
     /** Read the next 'compressed' block */
@@ -1078,8 +1084,8 @@
       // Check if we need to throw away a whole block of 
       // 'values' due to 'lazy decompression' 
       if (lazyDecompress && !valuesDecompressed) {
-        readBuffer(null, null, true);
-        readBuffer(null, null, true);
+        in.seek(WritableUtils.readVInt(in)+in.getPos());
+        in.seek(WritableUtils.readVInt(in)+in.getPos());
       }
       
       // Reset internal states
@@ -1099,14 +1105,14 @@
       noBufferedRecords = WritableUtils.readVInt(in);
       
       // Read key lengths and keys
-      readBuffer(keyLenBuffer, keyLenInFilter, false);
-      readBuffer(keyBuffer, keyInFilter, false);
+      readBuffer(keyLenBuffer, keyLenInFilter);
+      readBuffer(keyBuffer, keyInFilter);
       noBufferedKeys = noBufferedRecords;
       
       // Read value lengths and values
       if (!lazyDecompress) {
-        readBuffer(valLenBuffer, valLenInFilter, false);
-        readBuffer(valBuffer, valInFilter, false);
+        readBuffer(valLenBuffer, valLenInFilter);
+        readBuffer(valBuffer, valInFilter);
         noBufferedValues = noBufferedRecords;
         valuesDecompressed = true;
       }
@@ -1126,8 +1132,8 @@
         // Check if this is the first value in the 'block' to be read
         if (lazyDecompress && !valuesDecompressed) {
           // Read the value lengths and values
-          readBuffer(valLenBuffer, valLenInFilter, false);
-          readBuffer(valBuffer, valInFilter, false);
+          readBuffer(valLenBuffer, valLenInFilter);
+          readBuffer(valBuffer, valInFilter);
           noBufferedValues = noBufferedRecords;
           valuesDecompressed = true;
         }
@@ -1377,9 +1383,18 @@
       }
     }
 
-    /** Set the current byte position in the input file. */
+    /** Set the current byte position in the input file.
+     *
+     * <p>The position passed must be a position returned by {@link
+     * Writer#getLength()} when writing this file.  To seek to an arbitrary
+     * position, use {@link Reader#sync(long)}.
+     */
     public synchronized void seek(long position) throws IOException {
       in.seek(position);
+      if (blockCompressed) {                      // trigger block read
+        noBufferedKeys = 0;
+        valuesDecompressed = true;
+      }
     }
 
     /** Seek to the next sync mark past a given position.*/

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SetFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SetFile.java?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SetFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SetFile.java Tue Oct  3 14:08:15 2006
@@ -20,6 +20,7 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 
 /** A file-based set of keys. */
 public class SetFile extends MapFile {
@@ -38,6 +39,20 @@
     public Writer(FileSystem fs, String dirName, WritableComparator comparator)
       throws IOException {
       super(fs, dirName, comparator, NullWritable.class);
+    }
+
+    /** Create a set naming the element class and compression type. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+                  Class keyClass, SequenceFile.CompressionType compress)
+      throws IOException {
+      this(conf, fs, dirName, WritableComparator.get(keyClass), compress);
+    }
+
+    /** Create a set naming the element comparator and compression type. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+                  WritableComparator comparator,
+                  SequenceFile.CompressionType compress) throws IOException {
+      super(conf, fs, dirName, comparator, NullWritable.class, compress);
     }
 
     /** Append a key to a set.  The key must be strictly greater than the

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java Tue Oct  3 14:08:15 2006
@@ -20,8 +20,13 @@
 import java.io.InputStream;
 
 /**
-* A compression input stream.
-  * @author Arun C Murthy
+ * A compression input stream.
+ *
+ * <p>Implementations are assumed to be buffered.  This permits clients to
+ * reposition the underlying input stream then call {@link #resetState()},
+ * without having to also synchronize client buffers.
+ *
+ * @author Arun C Murthy
  */
 public abstract class CompressionInputStream extends InputStream {
   /**
@@ -49,8 +54,8 @@
   public abstract int read(byte[] b, int off, int len) throws IOException;
 
   /**
-   * Reset the compression to the initial state. Does not reset the underlying
-   * stream.
+   * Reset the decompressor to its initial state and discard any buffered data,
+   * as the underlying stream may have been repositioned.
    */
   public abstract void resetState() throws IOException;
   

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Tue Oct  3 14:08:15 2006
@@ -23,6 +23,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
@@ -40,10 +41,10 @@
 
     // ignore the progress parameter, since MapFile is local
     final MapFile.Writer out =
-      new MapFile.Writer(fs, file.toString(),
+      new MapFile.Writer(job, fs, file.toString(),
                          job.getMapOutputKeyClass(),
                          job.getMapOutputValueClass(),
-                         job.getBoolean("mapred.output.compress", false));
+                         SequenceFile.getCompressionType(job));
 
     return new RecordWriter() {
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java Tue Oct  3 14:08:15 2006
@@ -24,6 +24,7 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 
 /** Support for flat files of binary key/value pairs. */
 public class TestSetFile extends TestCase {
@@ -39,7 +40,10 @@
     FileSystem fs = new LocalFileSystem(conf);
     try {
         RandomDatum[] data = generate(10000);
-        writeTest(fs, data, FILE);
+        writeTest(fs, data, FILE, CompressionType.NONE);
+        readTest(fs, data, FILE);
+
+        writeTest(fs, data, FILE, CompressionType.BLOCK);
         readTest(fs, data, FILE);
     } finally {
         fs.close();
@@ -47,23 +51,27 @@
   }
 
   private static RandomDatum[] generate(int count) {
-    LOG.debug("generating " + count + " records in memory");
+    LOG.info("generating " + count + " records in memory");
     RandomDatum[] data = new RandomDatum[count];
     RandomDatum.Generator generator = new RandomDatum.Generator();
     for (int i = 0; i < count; i++) {
       generator.next();
       data[i] = generator.getValue();
     }
-    LOG.info("sorting " + count + " records in debug");
+    LOG.info("sorting " + count + " records");
     Arrays.sort(data);
     return data;
   }
 
-  private static void writeTest(FileSystem fs, RandomDatum[] data, String file)
+  private static void writeTest(FileSystem fs, RandomDatum[] data,
+                                String file, CompressionType compress)
     throws IOException {
     MapFile.delete(fs, file);
-    LOG.debug("creating with " + data.length + " records");
-    SetFile.Writer writer = new SetFile.Writer(fs, file, RandomDatum.class);
+    LOG.info("creating with " + data.length + " records");
+    SetFile.Writer writer =
+      new SetFile.Writer(conf, fs, file,
+                         WritableComparator.get(RandomDatum.class),
+                         compress);
     for (int i = 0; i < data.length; i++)
       writer.append(data[i]);
     writer.close();
@@ -72,14 +80,16 @@
   private static void readTest(FileSystem fs, RandomDatum[] data, String file)
     throws IOException {
     RandomDatum v = new RandomDatum();
-    LOG.debug("reading " + data.length + " records");
+    int sample = (int)Math.sqrt(data.length);
+    Random random = new Random();
+    LOG.info("reading " + sample + " records");
     SetFile.Reader reader = new SetFile.Reader(fs, file, conf);
-    for (int i = 0; i < data.length; i++) {
-      if (!reader.seek(data[i]))
+    for (int i = 0; i < sample; i++) {
+      if (!reader.seek(data[random.nextInt(data.length)]))
         throw new RuntimeException("wrong value at " + i);
     }
     reader.close();
-    LOG.info("done reading " + data.length + " debug");
+    LOG.info("done reading " + data.length);
   }
 
 
@@ -89,7 +99,9 @@
     boolean create = true;
     boolean check = true;
     String file = FILE;
-    String usage = "Usage: TestSetFile (-local | -dfs <namenode:port>) [-count N] [-nocreate] [-nocheck] file";
+    String compress = "NONE";
+
+    String usage = "Usage: TestSetFile (-local | -dfs <namenode:port>) [-count N] [-nocreate] [-nocheck] [-compress type] file";
       
     if (args.length == 0) {
       System.err.println(usage);
@@ -108,26 +120,30 @@
           create = false;
         } else if (args[i].equals("-nocheck")) {
           check = false;
+        } else if (args[i].equals("-compress")) {
+          compress = args[++i];
         } else {
           // file is required parameter
           file = args[i];
         }
+      }
 
-        LOG.info("count = " + count);
-        LOG.info("create = " + create);
-        LOG.info("check = " + check);
-        LOG.info("file = " + file);
-
-        RandomDatum[] data = generate(count);
-
-        if (create) {
-          writeTest(fs, data, file);
-        }
-
-        if (check) {
-          readTest(fs, data, file);
-        }
+      LOG.info("count = " + count);
+      LOG.info("create = " + create);
+      LOG.info("check = " + check);
+      LOG.info("compress = " + compress);
+      LOG.info("file = " + file);
+      
+      RandomDatum[] data = generate(count);
+      
+      if (create) {
+        writeTest(fs, data, file, CompressionType.valueOf(compress));
       }
+      
+      if (check) {
+        readTest(fs, data, file);
+      }
+  
     } finally {
       fs.close();
     }