You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/06/07 00:18:49 UTC

svn commit: r664159 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/lib/

Author: omalley
Date: Fri Jun  6 15:18:48 2008
New Revision: 664159

URL: http://svn.apache.org/viewvc?rev=664159&view=rev
Log:
 HADOOP-3413. Allow SequenceFile.Reader to use serialization framework. Contributed by Tom White.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jun  6 15:18:48 2008
@@ -72,11 +72,12 @@
     (hairong)
 
     HADOOP-3459. Change in the output format of dfs -ls to more closely match
-    /bin/ls. New format is: perm repl owner group size date name (Mukund Madhugiri via omally)
+    /bin/ls. New format is: perm repl owner group size date name
+    (Mukund Madhugiri via omally)
 
-    HADOOP-3113. An fsync invoked on a HDFS file really really persists data! The datanode
-    moves blocks in the tmp directory to the real block directory on a datanode-restart.
-    (dhruba)
+    HADOOP-3113. An fsync invoked on a HDFS file really really
+    persists data! The datanode moves blocks in the tmp directory to 
+    the real block directory on a datanode-restart. (dhruba)
 
     HADOOP-3452. Change fsck to return non-zero status for a corrupt
     FileSystem. (lohit vijayarenu via cdouglas)
@@ -157,6 +158,9 @@
 
     HADOOP-3502. Quota API needs documentation in Forrest. (hairong)
 
+    HADOOP-3413. Allow SequenceFile.Reader to use serialization
+    framework. (tomwhite via omalley)
+
   IMPROVEMENTS
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Fri Jun  6 15:18:48 2008
@@ -33,6 +33,7 @@
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.conf.*;
@@ -1403,6 +1404,9 @@
     private CompressionInputStream valInFilter = null;
     private DataInputStream valIn = null;
     private Decompressor valDecompressor = null;
+    
+    private Deserializer keyDeserializer;
+    private Deserializer valDeserializer;
 
     /** Open the named file. */
     public Reader(FileSystem fs, Path file, Configuration conf)
@@ -1540,9 +1544,27 @@
                                                    valLenDecompressor);
           valLenIn = new DataInputStream(valLenInFilter);
         }
+        
+        SerializationFactory serializationFactory =
+          new SerializationFactory(conf);
+        this.keyDeserializer =
+          getDeserializer(serializationFactory, getKeyClass());
+        if (!blockCompressed) {
+          this.keyDeserializer.open(valBuffer);
+        } else {
+          this.keyDeserializer.open(keyIn);
+        }
+        this.valDeserializer =
+          getDeserializer(serializationFactory, getValueClass());
+        this.valDeserializer.open(valIn);
       }
     }
     
+    @SuppressWarnings("unchecked")
+    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
+      return sf.getDeserializer(c);
+    }
+    
     /** Close the file. */
     public synchronized void close() throws IOException {
       // Return the decompressors to the pool
@@ -1551,6 +1573,13 @@
       CodecPool.returnDecompressor(valLenDecompressor);
       CodecPool.returnDecompressor(valDecompressor);
       
+      if (keyDeserializer != null) {
+    	keyDeserializer.close();
+      }
+      if (valDeserializer != null) {
+        valDeserializer.close();
+      }
+      
       // Close the input-stream
       in.close();
     }
@@ -1743,6 +1772,51 @@
 
     }
     
+    /**
+     * Get the 'value' corresponding to the last read 'key'.
+     * @param val : The 'value' to be read.
+     * @throws IOException
+     */
+    public synchronized Object getCurrentValue(Object val) 
+      throws IOException {
+      if (val instanceof Configurable) {
+        ((Configurable) val).setConf(this.conf);
+      }
+
+      // Position stream to 'current' value
+      seekToCurrentValue();
+
+      if (!blockCompressed) {
+        val = deserializeValue(val);
+        
+        if (valIn.read() > 0) {
+          LOG.info("available bytes: " + valIn.available());
+          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
+                                + " bytes, should read " +
+                                (valBuffer.getLength()-keyLength));
+        }
+      } else {
+        // Get the value
+        int valLength = WritableUtils.readVInt(valLenIn);
+        val = deserializeValue(val);
+        
+        // Read another compressed 'value'
+        --noBufferedValues;
+        
+        // Sanity check
+        if (valLength < 0) {
+          LOG.debug(val + " is a zero-length value");
+        }
+      }
+      return val;
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private Object deserializeValue(Object val) throws IOException {
+      return valDeserializer.deserialize(val);
+    }
+    
     /** Read the next key in the file into <code>key</code>, skipping its
      * value.  True if another entry exists, and false at end of file. */
     public synchronized boolean next(Writable key) throws IOException {
@@ -1974,6 +2048,60 @@
       
     }
 
+    /** Read the next key in the file, skipping its
+     * value.  Return null at end of file. */
+    public synchronized Object next(Object key) throws IOException {
+      if (key != null && key.getClass() != getKeyClass()) {
+        throw new IOException("wrong key class: "+key.getClass().getName()
+                              +" is not "+keyClass);
+      }
+
+      if (!blockCompressed) {
+        outBuf.reset();
+        
+        keyLength = next(outBuf);
+        if (keyLength < 0)
+          return null;
+        
+        valBuffer.reset(outBuf.getData(), outBuf.getLength());
+        
+        key = deserializeKey(key);
+        valBuffer.mark(0);
+        if (valBuffer.getPosition() != keyLength)
+          throw new IOException(key + " read " + valBuffer.getPosition()
+                                + " bytes, should read " + keyLength);
+      } else {
+        //Reset syncSeen
+        syncSeen = false;
+        
+        if (noBufferedKeys == 0) {
+          try {
+            readBlock();
+          } catch (EOFException eof) {
+            return null;
+          }
+        }
+        
+        int keyLength = WritableUtils.readVInt(keyLenIn);
+        
+        // Sanity check
+        if (keyLength < 0) {
+          return null;
+        }
+        
+        //Read another compressed 'key'
+        key = deserializeKey(key);
+        --noBufferedKeys;
+      }
+
+      return key;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Object deserializeKey(Object key) throws IOException {
+      return keyDeserializer.deserialize(key);
+    }
+
     /**
      * Read 'raw' values.
      * @param val - The 'raw' value

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java Fri Jun  6 15:18:48 2008
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
 import java.security.DigestException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -29,11 +28,8 @@
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -42,8 +38,7 @@
  * 
  */
 
-public class SequenceFileInputFilter<K extends WritableComparable,
-                                     V extends Writable>
+public class SequenceFileInputFilter<K, V>
   extends SequenceFileInputFormat<K, V> {
   
   final private static String FILTER_CLASS = "sequencefile.filter.class";
@@ -89,11 +84,11 @@
      * @param key record key
      * @return true if a record is accepted; return false otherwise
      */
-    public abstract boolean accept(Writable key);
+    public abstract boolean accept(Object key);
   }
     
   /**
-   * base calss for Filters
+   * base class for Filters
    */
   public static abstract class FilterBase implements Filter {
     Configuration conf;
@@ -136,9 +131,9 @@
 
     /** Filtering method
      * If key matches the regex, return true; otherwise return false
-     * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
+     * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
      */
-    public boolean accept(Writable key) {
+    public boolean accept(Object key) {
       return p.matcher(key.toString()).matches();
     }
   }
@@ -180,9 +175,9 @@
 
     /** Filtering method
      * If record# % frequency==0, return true; otherwise return false
-     * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
+     * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
      */
-    public boolean accept(Writable key) {
+    public boolean accept(Object key) {
       boolean accepted = false;
       if (count == 0)
         accepted = true;
@@ -241,9 +236,9 @@
 
     /** Filtering method
      * If MD5(key) % frequency==0, return true; otherwise return false
-     * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
+     * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
      */
-    public boolean accept(Writable key) {
+    public boolean accept(Object key) {
       try {
         long hashcode;
         if (key instanceof Text) {
@@ -282,8 +277,7 @@
     }
   }
     
-  private static class FilterRecordReader<K extends WritableComparable,
-                                          V extends Writable>
+  private static class FilterRecordReader<K, V>
     extends SequenceFileRecordReader<K, V> {
     
     private Filter filter;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java Fri Jun  6 15:18:48 2008
@@ -24,13 +24,9 @@
 
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.MapFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 
 /** An {@link InputFormat} for {@link SequenceFile}s. */
-public class SequenceFileInputFormat<K extends WritableComparable,
-                                     V extends Writable>
-  extends FileInputFormat<K, V> {
+public class SequenceFileInputFormat<K, V> extends FileInputFormat<K, V> {
 
   public SequenceFileInputFormat() {
     setMinSplitSize(SequenceFile.SYNC_INTERVAL);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Fri Jun  6 15:18:48 2008
@@ -26,8 +26,6 @@
 import org.apache.hadoop.fs.FileUtil;
 
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
@@ -35,9 +33,7 @@
 import org.apache.hadoop.util.*;
 
 /** An {@link OutputFormat} that writes {@link SequenceFile}s. */
-public class SequenceFileOutputFormat <K extends WritableComparable,
-                                       V extends Writable>
-extends FileOutputFormat<K, V> {
+public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {
 
   public RecordWriter<K, V> getRecordWriter(
                                           FileSystem ignored, JobConf job,

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java Fri Jun  6 15:18:48 2008
@@ -28,9 +28,7 @@
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** An {@link RecordReader} for {@link SequenceFile}s. */
-public class SequenceFileRecordReader<K extends WritableComparable,
-                                      V extends Writable>
-  implements RecordReader<K, V> {
+public class SequenceFileRecordReader<K, V> implements RecordReader<K, V> {
   
   private SequenceFile.Reader in;
   private long start;
@@ -55,11 +53,11 @@
 
 
   /** The class of key that must be passed to {@link
-   * #next(WritableComparable,Writable)}.. */
+   * #next(Object, Object)}.. */
   public Class getKeyClass() { return in.getKeyClass(); }
 
   /** The class of value that must be passed to {@link
-   * #next(WritableComparable,Writable)}.. */
+   * #next(Object, Object)}.. */
   public Class getValueClass() { return in.getValueClass(); }
   
   @SuppressWarnings("unchecked")
@@ -76,11 +74,14 @@
   public synchronized boolean next(K key, V value) throws IOException {
     if (!more) return false;
     long pos = in.getPosition();
-    boolean eof = in.next(key, value);
+    boolean remaining = (in.next(key) != null);
+    if (remaining) {
+      getCurrentValue(value);
+    }
     if (pos >= end && in.syncSeen()) {
       more = false;
     } else {
-      more = eof;
+      more = remaining;
     }
     return more;
   }
@@ -89,11 +90,11 @@
     throws IOException {
     if (!more) return false;
     long pos = in.getPosition();
-    boolean eof = in.next(key);
+    boolean remaining = (in.next(key) != null);
     if (pos >= end && in.syncSeen()) {
       more = false;
     } else {
-      more = eof;
+      more = remaining;
     }
     return more;
   }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java Fri Jun  6 15:18:48 2008
@@ -24,8 +24,6 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
@@ -39,10 +37,10 @@
  * 
  * Case one: This class is used for a map reduce job with at least one reducer.
  * The reducer wants to write data to different files depending on the actual
- * keys. It is assumed that a key (or value) enocodes the actual key (value)
+ * keys. It is assumed that a key (or value) encodes the actual key (value)
  * and the desired location for the actual key (value).
  * 
- * Case two: Tis class is used for a map only job. The job wants to use an
+ * Case two: This class is used for a map only job. The job wants to use an
  * output file name that is either a part of the input file name of the input
  * data, or some derivation of it.
  * 
@@ -50,8 +48,7 @@
  * output file name that depends on both the keys and the input file name,
  * 
  */
-public abstract class MultipleOutputFormat<K extends WritableComparable,
-                                           V extends Writable>
+public abstract class MultipleOutputFormat<K, V>
 extends FileOutputFormat<K, V> {
 
   /**

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java Fri Jun  6 15:18:48 2008
@@ -32,21 +32,20 @@
  * This class extends the MultipleOutputFormat, allowing to write the output data 
  * to different output files in sequence file output format. 
  */
-public class MultipleSequenceFileOutputFormat 
-extends MultipleOutputFormat<WritableComparable, Writable> {
+public class MultipleSequenceFileOutputFormat <K,V>
+extends MultipleOutputFormat<K, V> {
 
-  private SequenceFileOutputFormat theSequenceFileOutputFormat = null;
+    private SequenceFileOutputFormat<K,V> theSequenceFileOutputFormat = null;
   
   @Override
   @SuppressWarnings("unchecked") 
-  protected RecordWriter<WritableComparable, Writable> getBaseRecordWriter(
-                                                         FileSystem fs,
-                                                         JobConf job,
-                                                         String name,
-                                                         Progressable arg3) 
+  protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
+                                                   JobConf job,
+                                                   String name,
+                                                   Progressable arg3) 
   throws IOException {
     if (theSequenceFileOutputFormat == null) {
-      theSequenceFileOutputFormat = new SequenceFileOutputFormat();
+      theSequenceFileOutputFormat = new SequenceFileOutputFormat<K,V>();
     }
     return theSequenceFileOutputFormat.getRecordWriter(fs, job, name, arg3);
   }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java Fri Jun  6 15:18:48 2008
@@ -21,8 +21,6 @@
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.TextOutputFormat;
@@ -32,7 +30,7 @@
  * This class extends the MultipleOutputFormat, allowing to write the output
  * data to different output files in Text output format.
  */
-public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>
+public class MultipleTextOutputFormat<K, V>
     extends MultipleOutputFormat<K, V> {
 
   private TextOutputFormat<K, V> theTextOutputFormat = null;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Fri Jun  6 15:18:48 2008
@@ -19,8 +19,6 @@
 package org.apache.hadoop.mapred.lib;
 
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.MapRunnable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
@@ -49,10 +47,7 @@
  * value is 10 threads.
  * <p>
  */
-public class MultithreadedMapRunner<K1 extends WritableComparable,
-                                    V1 extends Writable,
-                                    K2 extends WritableComparable,
-                                    V2 extends Writable>
+public class MultithreadedMapRunner<K1, V1, K2, V2>
     implements MapRunnable<K1, V1, K2, V2> {
 
   private static final Log LOG =