You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2010/12/04 08:13:12 UTC

svn commit: r1042107 [1/6] - in /hadoop/common/branches/HADOOP-6685: ./ ivy/ src/java/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/file/tfile/ src/java/org/apache/hadoop/io/serial/ src/java/org/apache/had...

Author: omalley
Date: Sat Dec  4 07:13:10 2010
New Revision: 1042107

URL: http://svn.apache.org/viewvc?rev=1042107&view=rev
Log:
HADOOP-6685. Add new generic serialization interface.

Added:
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/RawComparator.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/Serialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/SerializationFactory.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/TypedSerialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/CompatibilitySerialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/DeserializationRawComparator.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/JavaSerialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/MemcmpRawComparator.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/SerializationMetadata.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/WritableSerialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/avro/
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/avro/AvroComparator.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/avro/AvroReflectSerializable.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/avro/AvroSerialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/avro/package.html
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/protobuf/
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/protobuf/ProtoBufComparator.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/protobuf/ProtoBufSerialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/thrift/
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/thrift/StreamTransport.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/thrift/ThriftSerialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/package-info.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/package-info.java
    hadoop/common/branches/HADOOP-6685/src/protobuf/
    hadoop/common/branches/HADOOP-6685/src/protobuf/SerializationMetadata.proto
    hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/AvroKey.java
    hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/AvroValue.java
    hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/ProtoTest.java
    hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/ProtoTest.proto
    hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/ThriftKey.java
    hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/ThriftValue.java
    hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/test.genavro
    hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/test.thrift
Removed:
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/package.html
Modified:
    hadoop/common/branches/HADOOP-6685/build.xml
    hadoop/common/branches/HADOOP-6685/ivy.xml
    hadoop/common/branches/HADOOP-6685/ivy/hadoop-common-template.xml
    hadoop/common/branches/HADOOP-6685/ivy/ivysettings.xml
    hadoop/common/branches/HADOOP-6685/ivy/libraries.properties
    hadoop/common/branches/HADOOP-6685/src/java/core-default.xml
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/ArrayFile.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/BloomMapFile.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DataInputBuffer.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DefaultStringifier.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/MapFile.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/RawComparator.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SetFile.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/BCFile.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/CompareUtils.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Compression.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/RawComparable.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFile.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFileDumper.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Utils.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/Deserializer.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/Serialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/Serializer.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerializable.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/security/SaslRpcServer.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/util/Options.java
    hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/util/ReflectionUtils.java
    hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/RandomDatum.java
    hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/TestDefaultStringifier.java
    hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/TestMapFile.java
    hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/TestSequenceFileSerialization.java
    hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java
    hadoop/common/branches/HADOOP-6685/src/test/findbugsExcludeFile.xml

Modified: hadoop/common/branches/HADOOP-6685/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/build.xml?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/build.xml (original)
+++ hadoop/common/branches/HADOOP-6685/build.xml Sat Dec  4 07:13:10 2010
@@ -96,7 +96,7 @@
   <property name="test.all.tests.file" value="${test.src.dir}/all-tests"/>
 
   <property name="javadoc.link.java"
-	    value="http://java.sun.com/javase/6/docs/api/"/>
+	    value="http://download.oracle.com/javase/6/docs/api"/>
   <property name="javadoc.packages" value="org.apache.hadoop.*"/>
   <property name="javadoc.maxmemory" value="512m" />
 

Modified: hadoop/common/branches/HADOOP-6685/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/ivy.xml?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/ivy.xml (original)
+++ hadoop/common/branches/HADOOP-6685/ivy.xml Sat Dec  4 07:13:10 2010
@@ -269,6 +269,22 @@
       <exclude module="jetty"/>
       <exclude module="slf4j-simple"/>
     </dependency>
+    <dependency org="com.google.protobuf"
+      name="protobuf-java"
+      rev="${protobuf.version}"
+      conf="common->default"/>
+    <dependency org="org.apache.hadoop"
+      name="libthrift"
+      rev="${thrift.version}"
+      conf="common->default">
+      <exclude module="servlet-api"/>
+      <exclude module="slf4j-api"/>
+      <exclude module="slf4j-log4j12"/>
+    </dependency>
+    <dependency org="org.yaml"
+      name="snakeyaml"
+      rev="${snakeyaml.version}"
+      conf="common->default"/>
     <dependency org="org.codehaus.jackson"
       name="jackson-mapper-asl"
       rev="${jackson.version}"

Modified: hadoop/common/branches/HADOOP-6685/ivy/hadoop-common-template.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/ivy/hadoop-common-template.xml?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/ivy/hadoop-common-template.xml (original)
+++ hadoop/common/branches/HADOOP-6685/ivy/hadoop-common-template.xml Sat Dec  4 07:13:10 2010
@@ -119,6 +119,21 @@
       <version>2.0.8</version>
     </dependency>
     <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>2.3.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>libthrift</artifactId>
+      <version>0.5.0.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.yaml</groupId>
+      <artifactId>snakeyaml</artifactId>
+      <version>1.7</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>avro</artifactId>
       <version>1.3.2</version>

Modified: hadoop/common/branches/HADOOP-6685/ivy/ivysettings.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/ivy/ivysettings.xml?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/ivy/ivysettings.xml (original)
+++ hadoop/common/branches/HADOOP-6685/ivy/ivysettings.xml Sat Dec  4 07:13:10 2010
@@ -30,6 +30,9 @@
  <resolvers>
    <!--ibiblio resolvers-->
     <ibiblio name="maven2" root="${repo.maven.org}" m2compatible="true"/>
+    <ibiblio name="apache" 
+       root="https://repository.apache.org/content/repositories/releases" 
+       m2compatible="true"/>
 
     <filesystem name="fs" m2compatible="true" force="true">
        <artifact pattern="${repo.dir}/[organisation]/[module]/[revision]/[module]-[revision].[ext]"/>
@@ -37,6 +40,7 @@
     </filesystem>
 
     <chain name="default" dual="true">
+      <resolver ref="apache"/>
       <resolver ref="maven2"/>
     </chain>
 

Modified: hadoop/common/branches/HADOOP-6685/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/ivy/libraries.properties?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/ivy/libraries.properties (original)
+++ hadoop/common/branches/HADOOP-6685/ivy/libraries.properties Sat Dec  4 07:13:10 2010
@@ -62,6 +62,8 @@ mina-core.version=2.0.0-M5
 
 oro.version=2.0.8
 
+protobuf.version=2.3.0
+
 rats-lib.version=0.6
 
 servlet.version=4.0.6
@@ -69,6 +71,9 @@ servlet-api-2.5.version=6.1.14
 servlet-api.version=2.5
 slf4j-api.version=1.5.11
 slf4j-log4j12.version=1.5.11
+snakeyaml.version=1.7
+
+thrift.version=0.5.0.0
 
 wagon-http.version=1.0-beta-2
 

Modified: hadoop/common/branches/HADOOP-6685/src/java/core-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/core-default.xml?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/core-default.xml (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/core-default.xml Sat Dec  4 07:13:10 2010
@@ -155,8 +155,8 @@
 </property>
 
 <property>
-  <name>io.serializations</name>
-  <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
+  <name>hadoop.serializations</name>
+  <value>org.apache.hadoop.io.serial.lib.WritableSerialization,org.apache.hadoop.io.serial.lib.protobuf.ProtoBufSerialization,org.apache.hadoop.io.serial.lib.thrift.ThriftSerialization,org.apache.hadoop.io.serial.lib.avro.AvroSerialization,org.apache.hadoop.io.serial.lib.CompatibilitySerialization</value>
   <description>A list of serialization classes that can be used for
   obtaining serializers and deserializers.</description>
 </property>

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java Sat Dec  4 07:13:10 2010
@@ -138,9 +138,11 @@ public class CommonConfigurationKeysPubl
   public static final String  IO_SORT_FACTOR_KEY = "io.sort.factor";
   /** Default value for IO_SORT_FACTOR_DEFAULT */
   public static final int     IO_SORT_FACTOR_DEFAULT = 100;
-  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
+  /** Defines the list of the deprecated serializations. */
   public static final String  IO_SERIALIZATIONS_KEY = "io.serializations";
-
+  /** Defines the list of serializations */
+  public static final String  HADOOP_SERIALIZATIONS_KEY = "hadoop.serializations";
+  
   /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
   public static final String  TFILE_IO_CHUNK_SIZE_KEY = "tfile.io.chunk.size";
   /** Default value for TFILE_IO_CHUNK_SIZE_DEFAULT */

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/ArrayFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/ArrayFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/ArrayFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/ArrayFile.java Sat Dec  4 07:13:10 2010
@@ -40,7 +40,7 @@ public class ArrayFile extends MapFile {
 
     /** Create the named file for values of the named class. */
     public Writer(Configuration conf, FileSystem fs,
-                  String file, Class<? extends Writable> valClass)
+                  String file, Class<?> valClass)
       throws IOException {
       super(conf, new Path(file), keyClass(LongWritable.class), 
             valueClass(valClass));
@@ -48,7 +48,7 @@ public class ArrayFile extends MapFile {
 
     /** Create the named file for values of the named class. */
     public Writer(Configuration conf, FileSystem fs,
-                  String file, Class<? extends Writable> valClass,
+                  String file, Class<?> valClass,
                   CompressionType compress, Progressable progress)
       throws IOException {
       super(conf, new Path(file), 
@@ -59,7 +59,7 @@ public class ArrayFile extends MapFile {
     }
 
     /** Append a value to the file. */
-    public synchronized void append(Writable value) throws IOException {
+    public synchronized void append(Object value) throws IOException {
       super.append(count, value);                 // add to map
       count.set(count.get()+1);                   // increment count
     }
@@ -81,20 +81,31 @@ public class ArrayFile extends MapFile {
       seek(key);
     }
 
-    /** Read and return the next value in the file. */
+    @Deprecated
     public synchronized Writable next(Writable value) throws IOException {
-      return next(key, value) ? value : null;
+      return (Writable) next((Object) value);
+    }
+
+    /** Read and return the next value in the file. */
+    public synchronized Object next(Object value) throws IOException {
+      key = (LongWritable) nextKey(key);
+      return key == null? null : getCurrentValue(value);
     }
 
     /** Returns the key associated with the most recent call to {@link
-     * #seek(long)}, {@link #next(Writable)}, or {@link
-     * #get(long,Writable)}. */
+     * #seek(long)}, {@link #next(Object)}, or {@link
+     * #get(long,Object)}. */
     public synchronized long key() throws IOException {
       return key.get();
     }
 
+    @Deprecated
+    public synchronized Writable get(long n, Writable value) throws IOException{
+      return (Writable) get(n, (Object) value);
+    }
+
     /** Return the <code>n</code>th value in the file. */
-    public synchronized Writable get(long n, Writable value)
+    public synchronized Object get(long n, Object value)
       throws IOException {
       key.set(n);
       return get(key, value);

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/BloomMapFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/BloomMapFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/BloomMapFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/BloomMapFile.java Sat Dec  4 07:13:10 2010
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.util.Options;
+import org.apache.hadoop.io.serial.Serialization;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.bloom.DynamicBloomFilter;
 import org.apache.hadoop.util.bloom.Filter;
@@ -42,7 +42,7 @@ import org.apache.hadoop.util.hash.Hash;
  * This class extends {@link MapFile} and provides very much the same
  * functionality. However, it uses dynamic Bloom filters to provide
  * quick membership test for keys, and it offers a fast version of 
- * {@link Reader#get(WritableComparable, Writable)} operation, especially in
+ * {@link Reader#get(Object, Object)} operation, especially in
  * case of sparsely populated MapFile-s.
  */
 @InterfaceAudience.Public
@@ -82,7 +82,9 @@ public class BloomMapFile {
     private DataOutputBuffer buf = new DataOutputBuffer();
     private FileSystem fs;
     private Path dir;
+    private final Serialization<Object> keySerialization;
     
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         Class<? extends WritableComparable> keyClass,
@@ -92,6 +94,7 @@ public class BloomMapFile {
            compression(compress, codec), progressable(progress));
     }
 
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         Class<? extends WritableComparable> keyClass,
@@ -101,6 +104,7 @@ public class BloomMapFile {
            compression(compress), progressable(progress));
     }
 
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         Class<? extends WritableComparable> keyClass,
@@ -110,6 +114,7 @@ public class BloomMapFile {
            compression(compress));
     }
 
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         WritableComparator comparator, Class valClass,
@@ -120,6 +125,7 @@ public class BloomMapFile {
            progressable(progress));
     }
 
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         WritableComparator comparator, Class valClass,
@@ -129,6 +135,7 @@ public class BloomMapFile {
            progressable(progress));
     }
 
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         WritableComparator comparator, Class valClass, CompressionType compress)
@@ -137,6 +144,7 @@ public class BloomMapFile {
            valueClass(valClass), compression(compress));
     }
 
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         WritableComparator comparator, Class valClass) throws IOException {
@@ -144,6 +152,7 @@ public class BloomMapFile {
            valueClass(valClass));
     }
 
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   Class<? extends WritableComparable> keyClass,
@@ -151,12 +160,14 @@ public class BloomMapFile {
       this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
     }
 
+    @SuppressWarnings("unchecked")
     public Writer(Configuration conf, Path dir, 
                   SequenceFile.Writer.Option... options) throws IOException {
       super(conf, dir, options);
       this.fs = dir.getFileSystem(conf);
       this.dir = dir;
       initBloomFilter(conf);
+      keySerialization = (Serialization<Object>) getKeySerialization();
     }
 
     private synchronized void initBloomFilter(Configuration conf) {
@@ -174,11 +185,10 @@ public class BloomMapFile {
     }
 
     @Override
-    public synchronized void append(WritableComparable key, Writable val)
-        throws IOException {
+    public synchronized void append(Object key, Object val) throws IOException {
       super.append(key, val);
       buf.reset();
-      key.write(buf);
+      keySerialization.serialize(buf, key);
       bloomKey.set(byteArrayForBloomKey(buf), 1.0);
       bloomFilter.add(bloomKey);
     }
@@ -198,11 +208,14 @@ public class BloomMapFile {
     private DynamicBloomFilter bloomFilter;
     private DataOutputBuffer buf = new DataOutputBuffer();
     private Key bloomKey = new Key();
+    private final Serialization<Object> keySerialization;
 
+    @SuppressWarnings("unchecked")
     public Reader(Path dir, Configuration conf,
                   SequenceFile.Reader.Option... options) throws IOException {
       super(dir, conf, options);
       initBloomFilter(dir, conf);
+      keySerialization = (Serialization<Object>) getKeySerialization();
     }
 
     @Deprecated
@@ -245,26 +258,40 @@ public class BloomMapFile {
      * @return  false iff key doesn't exist, true if key probably exists.
      * @throws IOException
      */
-    public boolean probablyHasKey(WritableComparable key) throws IOException {
+    public boolean probablyHasKey(Object key) throws IOException {
       if (bloomFilter == null) {
         return true;
       }
       buf.reset();
-      key.write(buf);
+      keySerialization.serialize(buf, key);
       bloomKey.set(byteArrayForBloomKey(buf), 1.0);
       return bloomFilter.membershipTest(bloomKey);
     }
     
     /**
      * Fast version of the
-     * {@link MapFile.Reader#get(WritableComparable, Writable)} method. First
+     * {@link MapFile.Reader#get(Object, Object)} method. First
      * it checks the Bloom filter for the existence of the key, and only if
      * present it performs the real get operation. This yields significant
      * performance improvements for get operations on sparsely populated files.
      */
+    @SuppressWarnings("unchecked")
+    @Deprecated
     @Override
-    public synchronized Writable get(WritableComparable key, Writable val)
-        throws IOException {
+    public synchronized Writable get(WritableComparable key,
+                                     Writable value) throws IOException {
+      return (Writable) get((Object) key, (Object) value);
+    }
+
+    /**
+     * Fast version of the
+     * {@link MapFile.Reader#get(Object, Object)} method. First
+     * it checks the Bloom filter for the existence of the key, and only if
+     * present it performs the real get operation. This yields significant
+     * performance improvements for get operations on sparsely populated files.
+     */
+    @Override
+    public synchronized Object get(Object key, Object val) throws IOException {
       if (!probablyHasKey(key)) {
         return null;
       }

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DataInputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DataInputBuffer.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DataInputBuffer.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DataInputBuffer.java Sat Dec  4 07:13:10 2010
@@ -93,4 +93,20 @@ public class DataInputBuffer extends Dat
   /** Returns the length of the input. */
   public int getLength() { return buffer.getLength(); }
 
+  public String toString() {
+    StringBuilder sb = new StringBuilder(3 * buffer.getLength() + 10);
+    byte[] bytes = getData();
+    for(int i=0; i < buffer.getLength(); i++) {
+      sb.append(' ');
+      String num = Integer.toHexString(0xff & bytes[i]);
+      // if it is only one digit, add a leading 0.
+      if (num.length() < 2) {
+        sb.append('0');
+      }
+      sb.append(num);
+    }
+    sb.append("; pos=");
+    sb.append(buffer.getPosition());
+    return sb.toString();
+  }
 }

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DefaultStringifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DefaultStringifier.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DefaultStringifier.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DefaultStringifier.java Sat Dec  4 07:13:10 2010
@@ -26,17 +26,15 @@ import org.apache.commons.codec.binary.B
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serial.Serialization;
+import org.apache.hadoop.io.serial.SerializationFactory;
 import org.apache.hadoop.util.GenericsUtil;
 
 /**
  * DefaultStringifier is the default implementation of the {@link Stringifier}
  * interface which stringifies the objects using base64 encoding of the
- * serialized version of the objects. The {@link Serializer} and
- * {@link Deserializer} are obtained from the {@link SerializationFactory}.
+ * serialized version of the objects. The {@link Serialization}
+ * is obtained from the {@link SerializationFactory}.
  * <br>
  * DefaultStringifier offers convenience methods to store/load objects to/from
  * the configuration.
@@ -49,43 +47,37 @@ public class DefaultStringifier<T> imple
 
   private static final String SEPARATOR = ",";
 
-  private Serializer<T> serializer;
+  private final Serialization<T> serialization;
 
-  private Deserializer<T> deserializer;
+  private final DataInputBuffer inBuf;
 
-  private DataInputBuffer inBuf;
-
-  private DataOutputBuffer outBuf;
+  private final DataOutputBuffer outBuf;
+  private final Configuration conf;
 
+  @SuppressWarnings("unchecked")
   public DefaultStringifier(Configuration conf, Class<T> c) {
 
-    SerializationFactory factory = new SerializationFactory(conf);
-    this.serializer = factory.getSerializer(c);
-    this.deserializer = factory.getDeserializer(c);
+    SerializationFactory factory = SerializationFactory.getInstance(conf);
     this.inBuf = new DataInputBuffer();
     this.outBuf = new DataOutputBuffer();
-    try {
-      serializer.open(outBuf);
-      deserializer.open(inBuf);
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    }
+    this.conf = conf;
+    this.serialization = (Serialization<T>) factory.getSerializationByType(c);
   }
 
   public T fromString(String str) throws IOException {
     try {
       byte[] bytes = Base64.decodeBase64(str.getBytes("UTF-8"));
       inBuf.reset(bytes, bytes.length);
-      T restored = deserializer.deserialize(null);
+      T restored = serialization.deserialize(inBuf, null, conf);
       return restored;
     } catch (UnsupportedCharsetException ex) {
-      throw new IOException(ex.toString());
+      throw new IOException("problem finding utf-8", ex);
     }
   }
 
   public String toString(T obj) throws IOException {
     outBuf.reset();
-    serializer.serialize(obj);
+    serialization.serialize(outBuf, obj);
     byte[] buf = new byte[outBuf.getLength()];
     System.arraycopy(outBuf.getData(), 0, buf, 0, buf.length);
     return new String(Base64.encodeBase64(buf));
@@ -94,8 +86,6 @@ public class DefaultStringifier<T> imple
   public void close() throws IOException {
     inBuf.close();
     outBuf.close();
-    deserializer.close();
-    serializer.close();
   }
 
   /**

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/MapFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/MapFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/MapFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/MapFile.java Sat Dec  4 07:13:10 2010
@@ -18,24 +18,25 @@
 
 package org.apache.hadoop.io;
 
+import java.io.EOFException;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.io.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.Options;
-import org.apache.hadoop.fs.*;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.serial.Serialization;
+import org.apache.hadoop.io.serial.SerializationFactory;
+import org.apache.hadoop.util.Progressable;
 
 /** A file-based map from keys to values.
  * 
@@ -68,8 +69,11 @@ public class MapFile {
 
   /** Writes a new map. */
   public static class Writer implements java.io.Closeable {
-    private SequenceFile.Writer data;
-    private SequenceFile.Writer index;
+    private final SequenceFile.Writer data;
+    private final SequenceFile.Writer index;
+    private final Configuration conf;
+    private final Serialization<Object> keySerialization;
+    private final Serialization<Object> valueSerialization;
 
     final private static String INDEX_INTERVAL = "io.map.index.interval";
     private int indexInterval = 128;
@@ -78,10 +82,11 @@ public class MapFile {
     private LongWritable position = new LongWritable();
 
     // the following fields are used only for checking key order
-    private WritableComparator comparator;
-    private DataInputBuffer inBuf = new DataInputBuffer();
-    private DataOutputBuffer outBuf = new DataOutputBuffer();
-    private WritableComparable lastKey;
+    private final RawComparator comparator;
+    private final DataInputBuffer inBuf = new DataInputBuffer();
+    private DataOutputBuffer lastKey;
+    private final DataOutputBuffer currentKey = new DataOutputBuffer();
+    private final DataOutputBuffer currentValue = new DataOutputBuffer();
 
     /** What's the position (in bytes) we wrote when we got the last index */
     private long lastIndexPos = -1;
@@ -97,6 +102,7 @@ public class MapFile {
     /** Create the named map for keys of the named class. 
      * @deprecated Use Writer(Configuration, Path, Option...) instead.
      */
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   Class<? extends WritableComparable> keyClass, 
@@ -107,6 +113,7 @@ public class MapFile {
     /** Create the named map for keys of the named class. 
      * @deprecated Use Writer(Configuration, Path, Option...) instead.
      */
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   Class<? extends WritableComparable> keyClass, Class valClass,
@@ -119,6 +126,7 @@ public class MapFile {
     /** Create the named map for keys of the named class. 
      * @deprecated Use Writer(Configuration, Path, Option...) instead.
      */
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   Class<? extends WritableComparable> keyClass, Class valClass,
@@ -131,6 +139,7 @@ public class MapFile {
     /** Create the named map for keys of the named class. 
      * @deprecated Use Writer(Configuration, Path, Option...) instead.
      */
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   Class<? extends WritableComparable> keyClass, Class valClass,
@@ -142,6 +151,7 @@ public class MapFile {
     /** Create the named map using the named key comparator. 
      * @deprecated Use Writer(Configuration, Path, Option...) instead.
      */
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   WritableComparator comparator, Class valClass
@@ -153,6 +163,7 @@ public class MapFile {
     /** Create the named map using the named key comparator. 
      * @deprecated Use Writer(Configuration, Path, Option...) instead.
      */
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   WritableComparator comparator, Class valClass,
@@ -164,6 +175,7 @@ public class MapFile {
     /** Create the named map using the named key comparator. 
      * @deprecated Use Writer(Configuration, Path, Option...)} instead.
      */
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   WritableComparator comparator, Class valClass,
@@ -177,6 +189,7 @@ public class MapFile {
     /** Create the named map using the named key comparator. 
      * @deprecated Use Writer(Configuration, Path, Option...) instead.
      */
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   WritableComparator comparator, Class valClass,
@@ -190,28 +203,18 @@ public class MapFile {
     // our options are a superset of sequence file writer options
     public static interface Option extends SequenceFile.Writer.Option { }
     
-    private static class KeyClassOption extends Options.ClassOption
-                                        implements Option {
-      KeyClassOption(Class<?> value) {
+    private static class ComparatorOption extends Options.ComparatorOption 
+                                          implements Option{
+      ComparatorOption(RawComparator value) {
         super(value);
       }
     }
-    
-    private static class ComparatorOption implements Option {
-      private final WritableComparator value;
-      ComparatorOption(WritableComparator value) {
-        this.value = value;
-      }
-      WritableComparator getValue() {
-        return value;
-      }
-    }
 
-    public static Option keyClass(Class<? extends WritableComparable> value) {
-      return new KeyClassOption(value);
+    public static SequenceFile.Writer.Option keyClass(Class<?> value) {
+      return new SequenceFile.Writer.KeyClassOption(value);
     }
     
-    public static Option comparator(WritableComparator value) {
+    public static Option comparator(RawComparator value) {
       return new ComparatorOption(value);
     }
 
@@ -234,31 +237,27 @@ public class MapFile {
       return SequenceFile.Writer.progressable(value);
     }
 
+    public static 
+    SequenceFile.Writer.Option keySerialization(Serialization<?> value) {
+      return SequenceFile.Writer.keySerialization(value);
+    }
+
+    public static 
+    SequenceFile.Writer.Option valueSerialization(Serialization<?> value) {
+      return SequenceFile.Writer.valueSerialization(value);
+    }
+ 
     @SuppressWarnings("unchecked")
     public Writer(Configuration conf, 
                   Path dirName,
                   SequenceFile.Writer.Option... opts
                   ) throws IOException {
-      KeyClassOption keyClassOption = 
-        Options.getOption(KeyClassOption.class, opts);
+      this.conf = conf;
       ComparatorOption comparatorOption =
         Options.getOption(ComparatorOption.class, opts);
-      if ((keyClassOption == null) == (comparatorOption == null)) {
-        throw new IllegalArgumentException("key class or comparator option "
-                                           + "must be set");
-      }
+      
       this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
 
-      Class<? extends WritableComparable> keyClass;
-      if (keyClassOption == null) {
-        this.comparator = comparatorOption.getValue();
-        keyClass = comparator.getKeyClass();
-      } else {
-        keyClass= 
-          (Class<? extends WritableComparable>) keyClassOption.getValue();
-        this.comparator = WritableComparator.get(keyClass);
-      }
-      this.lastKey = comparator.newKey();
       FileSystem fs = dirName.getFileSystem(conf);
 
       if (!fs.mkdirs(dirName)) {
@@ -269,13 +268,18 @@ public class MapFile {
 
       SequenceFile.Writer.Option[] dataOptions =
         Options.prependOptions(opts, 
-                               SequenceFile.Writer.file(dataFile),
-                               SequenceFile.Writer.keyClass(keyClass));
+                               SequenceFile.Writer.file(dataFile));
       this.data = SequenceFile.createWriter(conf, dataOptions);
+      keySerialization = (Serialization<Object>) data.getKeySerialization();
+      valueSerialization = (Serialization<Object>) data.getValueSerialization();
+      if (comparatorOption != null) {
+        comparator = comparatorOption.getValue();
+      } else {
+        comparator = keySerialization.getRawComparator();
+      }
 
       SequenceFile.Writer.Option[] indexOptions =
         Options.prependOptions(opts, SequenceFile.Writer.file(indexFile),
-            SequenceFile.Writer.keyClass(keyClass),
             SequenceFile.Writer.valueClass(LongWritable.class),
             SequenceFile.Writer.compression(CompressionType.BLOCK));
       this.index = SequenceFile.createWriter(conf, indexOptions);      
@@ -296,6 +300,22 @@ public class MapFile {
       conf.setInt(INDEX_INTERVAL, interval);
     }
 
+    /**
+     * Get the serialization used for the keys
+     * @return the key serialization
+     */
+    public Serialization<?> getKeySerialization() {
+      return data.getKeySerialization();
+    }
+    
+    /**
+     * Get the serialization used for the values
+     * @return the value serialization
+     */
+    public Serialization<?> getValueSerialization() {
+      return data.getValueSerialization();
+    }
+    
     /** Close the map. */
     public synchronized void close() throws IOException {
       data.close();
@@ -304,10 +324,14 @@ public class MapFile {
 
     /** Append a key/value pair to the map.  The key must be greater or equal
      * to the previous key added to the map. */
-    public synchronized void append(WritableComparable key, Writable val)
+    public synchronized void append(Object key, Object val)
       throws IOException {
 
-      checkKey(key);
+      currentKey.reset();
+      keySerialization.serialize(currentKey, key);
+      checkKey(currentKey, key);
+      currentValue.reset();
+      valueSerialization.serialize(currentValue, val);
 
       long pos = data.getLength();      
       // Only write an index if we've changed positions. In a block compressed
@@ -323,17 +347,21 @@ public class MapFile {
       size++;
     }
 
-    private void checkKey(WritableComparable key) throws IOException {
+    private void checkKey(DataOutputBuffer serialKey, Object key
+                          ) throws IOException {
       // check that keys are well-ordered
-      if (size != 0 && comparator.compare(lastKey, key) > 0)
-        throw new IOException("key out of order: "+key+" after "+lastKey);
-          
-      // update lastKey with a copy of key by writing and reading
-      outBuf.reset();
-      key.write(outBuf);                          // write new key
-
-      inBuf.reset(outBuf.getData(), outBuf.getLength());
-      lastKey.readFields(inBuf);                  // read into lastKey
+      if (lastKey == null) {
+        lastKey = new DataOutputBuffer();
+      } else if (comparator.compare(lastKey.getData(), 0, lastKey.getLength(), 
+                                    serialKey.getData(),0,serialKey.getLength()) 
+                      > 0) {
+        // rebuild the previous key so that we can explain what's wrong
+        inBuf.reset(lastKey.getData(), 0, lastKey.getLength());
+        Object prevKey = keySerialization.deserialize(inBuf, null, conf);
+        throw new IOException("key out of order: "+ key +" after "+ prevKey);
+      }
+      lastKey.reset();
+      lastKey.write(serialKey.getData(), 0, serialKey.getLength());
     }
 
   }
@@ -346,9 +374,12 @@ public class MapFile {
      * files using less memory. */
     private int INDEX_SKIP = 0;
       
-    private WritableComparator comparator;
+    private RawComparator comparator;
+    private Serialization<Object> keySerialization;
+    private final Configuration conf;
 
-    private WritableComparable nextKey;
+    private DataOutputBuffer nextKey = new DataOutputBuffer();
+    private DataInputBuffer inBuf = new DataInputBuffer();
     private long seekPosition = -1;
     private int seekIndex = -1;
     private long firstPosition;
@@ -362,36 +393,55 @@ public class MapFile {
 
     // the index, in memory
     private int count = -1;
-    private WritableComparable[] keys;
+    private byte[][] keys;
     private long[] positions;
 
-    /** Returns the class of keys in this file. */
+    /** Returns the class of keys in this file. 
+     * @deprecated Use {@link #getKeySerialization} instead.
+     */
+    @Deprecated
     public Class<?> getKeyClass() { return data.getKeyClass(); }
 
-    /** Returns the class of values in this file. */
+    /** Returns the class of values in this file. 
+     * @deprecated Use {@link #getValueSerialization} instead.
+     */
+    @Deprecated
     public Class<?> getValueClass() { return data.getValueClass(); }
 
+    /**
+     * Get the key serialization for this map file.
+     * @return the serialization for the key
+     */
+    public Serialization<?> getKeySerialization() {
+      return keySerialization;
+    }
+    
+    /**
+     * Get the value serialization for this map file.
+     * @return the serialization for the value
+     */
+    public Serialization<?> getValueSerialization() {
+      return data.getValueSerialization();
+    }
     public static interface Option extends SequenceFile.Reader.Option {}
     
     public static Option comparator(WritableComparator value) {
       return new ComparatorOption(value);
     }
 
-    static class ComparatorOption implements Option {
-      private final WritableComparator value;
-      ComparatorOption(WritableComparator value) {
-        this.value = value;
-      }
-      WritableComparator getValue() {
-        return value;
+    static class ComparatorOption extends Options.ComparatorOption 
+                                          implements Option {
+      ComparatorOption(RawComparator value) {
+        super(value);
       }
     }
 
     public Reader(Path dir, Configuration conf,
                   SequenceFile.Reader.Option... opts) throws IOException {
+      this.conf = conf;
       ComparatorOption comparatorOption = 
         Options.getOption(ComparatorOption.class, opts);
-      WritableComparator comparator =
+      RawComparator comparator =
         comparatorOption == null ? null : comparatorOption.getValue();
       INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
       open(dir, comparator, conf, opts);
@@ -415,8 +465,9 @@ public class MapFile {
       this(new Path(dirName), conf, comparator(comparator));
     }
     
+    @SuppressWarnings("unchecked")
     protected synchronized void open(Path dir,
-                                     WritableComparator comparator,
+                                     RawComparator comparator,
                                      Configuration conf, 
                                      SequenceFile.Reader.Option... options
                                      ) throws IOException {
@@ -426,13 +477,13 @@ public class MapFile {
       // open the data
       this.data = createDataFileReader(dataFile, conf, options);
       this.firstPosition = data.getPosition();
+      keySerialization = (Serialization<Object>) data.getKeySerialization();
 
-      if (comparator == null)
-        this.comparator = 
-          WritableComparator.get(data.getKeyClass().
-                                   asSubclass(WritableComparable.class));
-      else
+      if (comparator == null) {
+        this.comparator = keySerialization.getRawComparator();
+      } else {
         this.comparator = comparator;
+      }
 
       // open the index
       SequenceFile.Reader.Option[] indexOptions =
@@ -463,19 +514,25 @@ public class MapFile {
       try {
         int skip = INDEX_SKIP;
         LongWritable position = new LongWritable();
-        WritableComparable lastKey = null;
+        byte[] lastKey = null;
         long lastIndex = -1;
-        ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024);
-        while (true) {
-          WritableComparable k = comparator.newKey();
-
-          if (!index.next(k, position))
-            break;
+        ArrayList<byte[]> keyBuilder = new ArrayList<byte[]>(1024);
+        DataOutputBuffer key = new DataOutputBuffer();
+        while (index.nextRawKey(key) > 0) {
+          position = (LongWritable) index.getCurrentValue(position);
 
           // check order to make sure comparator is compatible
-          if (lastKey != null && comparator.compare(lastKey, k) > 0)
-            throw new IOException("key out of order: "+k+" after "+lastKey);
-          lastKey = k;
+          if (lastKey != null && 
+              comparator.compare(lastKey, 0, lastKey.length,
+                                 key.getData(), 0 , key.getLength()) > 0) {
+            inBuf.reset(lastKey, 0, lastKey.length);
+            Object prevKey = keySerialization.deserialize(inBuf, null, conf);
+            inBuf.reset(key.getData(), 0, key.getLength());
+            Object curKey = keySerialization.deserialize(inBuf, null, conf);
+            throw new IOException("key out of order: "+ curKey + " after " +
+                                  prevKey);
+          }
+          lastKey = Arrays.copyOf(key.getData(), key.getLength());
           if (skip > 0) {
             skip--;
             continue;                             // skip this entry
@@ -483,28 +540,28 @@ public class MapFile {
             skip = INDEX_SKIP;                    // reset skip
           }
 
-	  // don't read an index that is the same as the previous one. Block
-	  // compressed map files used to do this (multiple entries would point
-	  // at the same block)
-	  if (position.get() == lastIndex)
-	    continue;
+          // don't read an index that is the same as the previous one. Block
+          // compressed map files used to do this (multiple entries would point
+          // at the same block)
+          if (position.get() == lastIndex)
+            continue;
 
           if (count == positions.length) {
-	    positions = Arrays.copyOf(positions, positions.length * 2);
+            positions = Arrays.copyOf(positions, positions.length * 2);
           }
 
-          keyBuilder.add(k);
+          keyBuilder.add(lastKey);
           positions[count] = position.get();
           count++;
         }
 
-        this.keys = keyBuilder.toArray(new WritableComparable[count]);
+        this.keys = keyBuilder.toArray(new byte[count][]);
         positions = Arrays.copyOf(positions, count);
       } catch (EOFException e) {
         LOG.warn("Unexpected EOF reading " + index +
-                              " at entry #" + count + ".  Ignoring.");
+                 " at entry #" + count + ".  Ignoring.");
       } finally {
-	indexClosed = true;
+        indexClosed = true;
         index.close();
       }
     }
@@ -517,22 +574,23 @@ public class MapFile {
     /** Get the key at approximately the middle of the file. Or null if the
      *  file is empty. 
      */
-    public synchronized WritableComparable midKey() throws IOException {
+    public synchronized Object midKey() throws IOException {
 
       readIndex();
       if (count == 0) {
         return null;
       }
     
-      return keys[(count - 1) / 2];
+      byte[] rawKey = keys[(count -1) / 2];
+      inBuf.reset(rawKey, 0, rawKey.length);
+      return keySerialization.deserialize(inBuf, null, conf);
     }
     
     /** Reads the final key from the file.
      *
      * @param key key to read into
      */
-    public synchronized void finalKey(WritableComparable key)
-      throws IOException {
+    public synchronized Object finalKey(Object key) throws IOException {
 
       long originalPosition = data.getPosition(); // save position
       try {
@@ -542,8 +600,12 @@ public class MapFile {
         } else {
           reset();                                // start at the beginning
         }
-        while (data.next(key)) {}                 // scan to eof
-
+        Object prevKey = null;
+        do {
+          prevKey = key;
+          key = data.nextKey(key);
+        } while (key != null);
+        return prevKey;
       } finally {
         data.seek(originalPosition);              // restore position
       }
@@ -553,7 +615,7 @@ public class MapFile {
      * first entry after the named key.  Returns true iff the named key exists
      * in this map.
      */
-    public synchronized boolean seek(WritableComparable key) throws IOException {
+    public synchronized boolean seek(Object key) throws IOException {
       return seekInternal(key) == 0;
     }
 
@@ -565,7 +627,7 @@ public class MapFile {
      *          < 0 - positioned at next record
      *          1   - no more records in file
      */
-    private synchronized int seekInternal(WritableComparable key)
+    private synchronized int seekInternal(Object key)
       throws IOException {
       return seekInternal(key, false);
     }
@@ -582,19 +644,24 @@ public class MapFile {
      *          < 0 - positioned at next record
      *          1   - no more records in file
      */
-    private synchronized int seekInternal(WritableComparable key,
-        final boolean before)
-      throws IOException {
+    private synchronized int seekInternal(Object key,
+                                          final boolean before
+                                          ) throws IOException {
       readIndex();                                // make sure index is read
+      DataOutputBuffer keyBuffer = new DataOutputBuffer();
+      keySerialization.serialize(keyBuffer, key);
 
       if (seekIndex != -1                         // seeked before
           && seekIndex+1 < count           
-          && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
-          && comparator.compare(key, nextKey)
-          >= 0) {                                 // but after last seeked
+          && comparator.compare(keyBuffer.getData(), 0, keyBuffer.getLength(), 
+                                keys[seekIndex+1], 0, keys[seekIndex+1].length)
+                             < 0       // before next indexed
+          && comparator.compare(keyBuffer.getData(), 0, keyBuffer.getLength(), 
+                                nextKey.getData(), 0, nextKey.getLength())
+                             >= 0) {   // but after last seeked
         // do nothing
       } else {
-        seekIndex = binarySearch(key);
+        seekIndex = binarySearch(keyBuffer.getData(), keyBuffer.getLength());
         if (seekIndex < 0)                        // decode insertion point
           seekIndex = -seekIndex-2;
 
@@ -605,17 +672,15 @@ public class MapFile {
       }
       data.seek(seekPosition);
       
-      if (nextKey == null)
-        nextKey = comparator.newKey();
-     
       // If we're looking for the key before, we need to keep track
       // of the position we got the current key as well as the position
       // of the key before it.
       long prevPosition = -1;
       long curPosition = seekPosition;
 
-      while (data.next(nextKey)) {
-        int c = comparator.compare(key, nextKey);
+      while (data.nextRawKey(nextKey) != -1) {
+        int c = comparator.compare(keyBuffer.getData(), 0, keyBuffer.getLength(),
+                                   nextKey.getData(), 0 , nextKey.getLength());
         if (c <= 0) {                             // at or beyond desired
           if (before && c != 0) {
             if (prevPosition == -1) {
@@ -627,7 +692,7 @@ public class MapFile {
             } else {
               // We have a previous record to back up to
               data.seek(prevPosition);
-              data.next(nextKey);
+              data.nextRawKey(nextKey);
               // now that we've rewound, the search key must be greater than this key
               return 1;
             }
@@ -639,18 +704,24 @@ public class MapFile {
           curPosition = data.getPosition();
         }
       }
-
+      // if we have fallen off the end of the file and we want the before key
+      // then back up to the previous key
+      if (before && prevPosition != -1) {
+        data.seek(prevPosition);
+        data.nextRawKey(nextKey);
+      }
       return 1;
     }
 
-    private int binarySearch(WritableComparable key) {
+    private int binarySearch(byte[] key, int length) {
       int low = 0;
       int high = count-1;
 
       while (low <= high) {
         int mid = (low + high) >>> 1;
-        WritableComparable midVal = keys[mid];
-        int cmp = comparator.compare(midVal, key);
+        byte[] midVal = keys[mid];
+        int cmp = comparator.compare(midVal, 0, midVal.length,
+                                     key, 0, length);
 
         if (cmp < 0)
           low = mid + 1;
@@ -664,18 +735,59 @@ public class MapFile {
 
     /** Read the next key/value pair in the map into <code>key</code> and
      * <code>val</code>.  Returns true if such a pair exists and false when at
-     * the end of the map */
+     * the end of the map 
+     * @deprecated Use {@link #nextKey} and {@link #getCurrentValue} instead.
+     */
+    @SuppressWarnings("unchecked")
+    @Deprecated
     public synchronized boolean next(WritableComparable key, Writable val)
       throws IOException {
       return data.next(key, val);
     }
+    
+    /**
+     * Read the next key in the map.
+     * @param reusable an object that may be re-used for holding the next key
+     * @return the key that was read or null if there is not another key
+     * @throws IOException
+     */
+    public Object nextKey(Object reusable) throws IOException {
+      return data.nextKey(reusable);
+    }
+
+    /**
+     * Get the current value in the map.
+     * @param reusable an object that may be re-used for hold the value
+     * @return the value that was read in
+     * @throws IOException
+     */
+    public Object getCurrentValue(Object reusable) throws IOException {
+      return data.getCurrentValue(reusable);
+    }
+
+    /**
+     * Return the value for the named key, or null if none exists.
+     * @param key the key to look for
+     * @param value a object to read into
+     * @return the value that was found or null if the key wasn't found
+     * @throws IOException
+     * @deprecated Use {@link #seek} and {@link #getCurrentValue} instead.
+     */
+    @SuppressWarnings("unchecked")
+    @Deprecated
+    public synchronized Writable get(WritableComparable key,
+                                     Writable value) throws IOException {
+      if (seek(key)) {
+        return (Writable) data.getCurrentValue(value);
+      } else {
+        return null;
+      }
+    }
 
     /** Return the value for the named key, or null if none exists. */
-    public synchronized Writable get(WritableComparable key, Writable val)
-      throws IOException {
+    public synchronized Object get(Object key, Object val) throws IOException{
       if (seek(key)) {
-        data.getCurrentValue(val);
-        return val;
+        return data.getCurrentValue(val);
       } else
         return null;
     }
@@ -689,9 +801,8 @@ public class MapFile {
 -     * @param val       - data value if key is found
 -     * @return          - the key that was the closest match or null if eof.
      */
-    public synchronized WritableComparable getClosest(WritableComparable key,
-      Writable val)
-    throws IOException {
+    public Object getClosest(Object key,
+                             Object val)  throws IOException {
       return getClosest(key, val, false);
     }
 
@@ -705,9 +816,10 @@ public class MapFile {
      * return the record that sorts just after.
      * @return          - the key that was the closest match or null if eof.
      */
-    public synchronized WritableComparable getClosest(WritableComparable key,
-        Writable val, final boolean before)
-      throws IOException {
+    public synchronized Object getClosest(Object key,
+                                          Object val, 
+                                          final boolean before
+                                          ) throws IOException {
      
       int c = seekInternal(key, before);
 
@@ -720,7 +832,9 @@ public class MapFile {
       }
 
       data.getCurrentValue(val);
-      return nextKey;
+      // deserialize the key
+      inBuf.reset(nextKey.getData(), 0, nextKey.getLength());
+      return keySerialization.deserialize(inBuf, null, conf);
     }
 
     /** Close the map. */
@@ -764,17 +878,24 @@ public class MapFile {
    * @return number of valid entries in this MapFile, or -1 if no fixing was needed
    * @throws Exception
    */
+  @SuppressWarnings("unchecked")
   public static long fix(FileSystem fs, Path dir,
-                         Class<? extends Writable> keyClass,
-                         Class<? extends Writable> valueClass, boolean dryrun,
-                         Configuration conf) throws Exception {
+                         Class<?> keyClass,
+                         Class<?> valueClass, boolean dryrun,
+                         Configuration conf) throws IOException {
     String dr = (dryrun ? "[DRY RUN ] " : "");
     Path data = new Path(dir, DATA_FILE_NAME);
     Path index = new Path(dir, INDEX_FILE_NAME);
     int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128);
+    SerializationFactory factory = SerializationFactory.getInstance(conf);
+    Serialization<Object> keySerialization = (Serialization<Object>)
+      factory.getSerializationByType(keyClass);
+    Serialization<Object> valueSerialization = (Serialization<Object>) 
+      factory.getSerializationByType(valueClass);
     if (!fs.exists(data)) {
       // there's nothing we can do to fix this!
-      throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
+      throw new IOException(dr + "Missing data file in " + dir + 
+                            ", impossible to fix this.");
     }
     if (fs.exists(index)) {
       // no fixing needed
@@ -782,17 +903,17 @@ public class MapFile {
     }
     SequenceFile.Reader dataReader = 
       new SequenceFile.Reader(conf, SequenceFile.Reader.file(data));
-    if (!dataReader.getKeyClass().equals(keyClass)) {
-      throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
-                          ", got " + dataReader.getKeyClass().getName());
-    }
-    if (!dataReader.getValueClass().equals(valueClass)) {
-      throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
-                          ", got " + dataReader.getValueClass().getName());
+    if (!dataReader.getKeySerialization().equals(keySerialization)) {
+      throw new IOException(dr + "Wrong key serialization in " + dir + 
+                            ", expected" + keySerialization +
+                            ", got " + dataReader.getKeySerialization());
+    }
+    if (!dataReader.getValueSerialization().equals(valueSerialization)) {
+      throw new IOException(dr + "Wrong value serialization in " + dir + 
+                            ", expected" + valueSerialization +
+                            ", got " + dataReader.getValueSerialization());
     }
     long cnt = 0L;
-    Writable key = ReflectionUtils.newInstance(keyClass, conf);
-    Writable value = ReflectionUtils.newInstance(valueClass, conf);
     SequenceFile.Writer indexWriter = null;
     if (!dryrun) {
       indexWriter = 
@@ -805,7 +926,10 @@ public class MapFile {
     try {
       long pos = 0L;
       LongWritable position = new LongWritable();
-      while(dataReader.next(key, value)) {
+      Object key = null;
+      Object value = null;
+      while((key = dataReader.nextKey(key)) != null) {
+        value = dataReader.getCurrentValue(value);
         cnt++;
         if (cnt % indexInterval == 0) {
           position.set(pos);
@@ -834,21 +958,21 @@ public class MapFile {
     String out = args[1];
 
     Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.getLocal(conf);
-    MapFile.Reader reader = new MapFile.Reader(fs, in, conf);
+    MapFile.Reader reader = new MapFile.Reader(new Path(in), conf);
+    Serialization<?> keySerialization = reader.getKeySerialization();
+    Serialization<?> valueSerialization = reader.getValueSerialization();
     MapFile.Writer writer =
-      new MapFile.Writer(conf, fs, out,
-          reader.getKeyClass().asSubclass(WritableComparable.class),
-          reader.getValueClass());
-
-    WritableComparable key =
-      ReflectionUtils.newInstance(reader.getKeyClass().asSubclass(WritableComparable.class), conf);
-    Writable value =
-      ReflectionUtils.newInstance(reader.getValueClass().asSubclass(Writable.class), conf);
+      new MapFile.Writer(conf, new Path(out), 
+                         Writer.keySerialization(keySerialization),
+                         Writer.valueSerialization(valueSerialization));
 
-    while (reader.next(key, value))               // copy all entries
-      writer.append(key, value);
+    Object key = null;
+    Object value = null;
 
+    while ((key = reader.nextKey(key)) != null) {          // copy all entries
+      value = reader.getCurrentValue(value);
+      writer.append(key, value);
+    }
     writer.close();
   }
 

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/RawComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/RawComparator.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/RawComparator.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/RawComparator.java Sat Dec  4 07:13:10 2010
@@ -22,7 +22,6 @@ import java.util.Comparator;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.serializer.DeserializerComparator;
 
 /**
  * <p>
@@ -30,12 +29,15 @@ import org.apache.hadoop.io.serializer.D
  * objects.
  * </p>
  * @param <T>
- * @see DeserializerComparator
+ * @deprecated Use {@link org.apache.hadoop.io.serial.RawComparator} instead.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public interface RawComparator<T> extends Comparator<T> {
+@Deprecated
+public interface RawComparator<T> 
+     extends Comparator<T>, org.apache.hadoop.io.serial.RawComparator {
 
+  @Override
   public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
 
 }