You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2010/12/04 08:13:12 UTC
svn commit: r1042107 [1/6] - in /hadoop/common/branches/HADOOP-6685: ./ ivy/
src/java/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/
src/java/org/apache/hadoop/io/file/tfile/
src/java/org/apache/hadoop/io/serial/ src/java/org/apache/had...
Author: omalley
Date: Sat Dec 4 07:13:10 2010
New Revision: 1042107
URL: http://svn.apache.org/viewvc?rev=1042107&view=rev
Log:
HADOOP-6685. Add new generic serialization interface.
Added:
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/RawComparator.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/Serialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/SerializationFactory.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/TypedSerialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/CompatibilitySerialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/DeserializationRawComparator.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/JavaSerialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/MemcmpRawComparator.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/SerializationMetadata.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/WritableSerialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/avro/
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/avro/AvroComparator.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/avro/AvroReflectSerializable.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/avro/AvroSerialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/avro/package.html
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/protobuf/
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/protobuf/ProtoBufComparator.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/protobuf/ProtoBufSerialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/thrift/
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/thrift/StreamTransport.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/thrift/ThriftSerialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/package-info.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/package-info.java
hadoop/common/branches/HADOOP-6685/src/protobuf/
hadoop/common/branches/HADOOP-6685/src/protobuf/SerializationMetadata.proto
hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/AvroKey.java
hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/AvroValue.java
hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/ProtoTest.java
hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/ProtoTest.proto
hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/ThriftKey.java
hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/ThriftValue.java
hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/test.genavro
hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/test.thrift
Removed:
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/package.html
Modified:
hadoop/common/branches/HADOOP-6685/build.xml
hadoop/common/branches/HADOOP-6685/ivy.xml
hadoop/common/branches/HADOOP-6685/ivy/hadoop-common-template.xml
hadoop/common/branches/HADOOP-6685/ivy/ivysettings.xml
hadoop/common/branches/HADOOP-6685/ivy/libraries.properties
hadoop/common/branches/HADOOP-6685/src/java/core-default.xml
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/ArrayFile.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/BloomMapFile.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DataInputBuffer.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DefaultStringifier.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/MapFile.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/RawComparator.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SetFile.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/BCFile.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/CompareUtils.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Compression.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/RawComparable.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFile.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFileDumper.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Utils.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/Deserializer.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/Serialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/Serializer.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerializable.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/security/SaslRpcServer.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/util/Options.java
hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/util/ReflectionUtils.java
hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/RandomDatum.java
hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/TestDefaultStringifier.java
hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/TestMapFile.java
hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/TestSequenceFileSerialization.java
hadoop/common/branches/HADOOP-6685/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java
hadoop/common/branches/HADOOP-6685/src/test/findbugsExcludeFile.xml
Modified: hadoop/common/branches/HADOOP-6685/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/build.xml?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/build.xml (original)
+++ hadoop/common/branches/HADOOP-6685/build.xml Sat Dec 4 07:13:10 2010
@@ -96,7 +96,7 @@
<property name="test.all.tests.file" value="${test.src.dir}/all-tests"/>
<property name="javadoc.link.java"
- value="http://java.sun.com/javase/6/docs/api/"/>
+ value="http://download.oracle.com/javase/6/docs/api"/>
<property name="javadoc.packages" value="org.apache.hadoop.*"/>
<property name="javadoc.maxmemory" value="512m" />
Modified: hadoop/common/branches/HADOOP-6685/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/ivy.xml?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/ivy.xml (original)
+++ hadoop/common/branches/HADOOP-6685/ivy.xml Sat Dec 4 07:13:10 2010
@@ -269,6 +269,22 @@
<exclude module="jetty"/>
<exclude module="slf4j-simple"/>
</dependency>
+ <dependency org="com.google.protobuf"
+ name="protobuf-java"
+ rev="${protobuf.version}"
+ conf="common->default"/>
+ <dependency org="org.apache.hadoop"
+ name="libthrift"
+ rev="${thrift.version}"
+ conf="common->default">
+ <exclude module="servlet-api"/>
+ <exclude module="slf4j-api"/>
+ <exclude module="slf4j-log4j12"/>
+ </dependency>
+ <dependency org="org.yaml"
+ name="snakeyaml"
+ rev="${snakeyaml.version}"
+ conf="common->default"/>
<dependency org="org.codehaus.jackson"
name="jackson-mapper-asl"
rev="${jackson.version}"
Modified: hadoop/common/branches/HADOOP-6685/ivy/hadoop-common-template.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/ivy/hadoop-common-template.xml?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/ivy/hadoop-common-template.xml (original)
+++ hadoop/common/branches/HADOOP-6685/ivy/hadoop-common-template.xml Sat Dec 4 07:13:10 2010
@@ -119,6 +119,21 @@
<version>2.0.8</version>
</dependency>
<dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>2.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.5.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>1.7</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>avro</artifactId>
<version>1.3.2</version>
Modified: hadoop/common/branches/HADOOP-6685/ivy/ivysettings.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/ivy/ivysettings.xml?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/ivy/ivysettings.xml (original)
+++ hadoop/common/branches/HADOOP-6685/ivy/ivysettings.xml Sat Dec 4 07:13:10 2010
@@ -30,6 +30,9 @@
<resolvers>
<!--ibiblio resolvers-->
<ibiblio name="maven2" root="${repo.maven.org}" m2compatible="true"/>
+ <ibiblio name="apache"
+ root="https://repository.apache.org/content/repositories/releases"
+ m2compatible="true"/>
<filesystem name="fs" m2compatible="true" force="true">
<artifact pattern="${repo.dir}/[organisation]/[module]/[revision]/[module]-[revision].[ext]"/>
@@ -37,6 +40,7 @@
</filesystem>
<chain name="default" dual="true">
+ <resolver ref="apache"/>
<resolver ref="maven2"/>
</chain>
Modified: hadoop/common/branches/HADOOP-6685/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/ivy/libraries.properties?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/ivy/libraries.properties (original)
+++ hadoop/common/branches/HADOOP-6685/ivy/libraries.properties Sat Dec 4 07:13:10 2010
@@ -62,6 +62,8 @@ mina-core.version=2.0.0-M5
oro.version=2.0.8
+protobuf.version=2.3.0
+
rats-lib.version=0.6
servlet.version=4.0.6
@@ -69,6 +71,9 @@ servlet-api-2.5.version=6.1.14
servlet-api.version=2.5
slf4j-api.version=1.5.11
slf4j-log4j12.version=1.5.11
+snakeyaml.version=1.7
+
+thrift.version=0.5.0.0
wagon-http.version=1.0-beta-2
Modified: hadoop/common/branches/HADOOP-6685/src/java/core-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/core-default.xml?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/core-default.xml (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/core-default.xml Sat Dec 4 07:13:10 2010
@@ -155,8 +155,8 @@
</property>
<property>
- <name>io.serializations</name>
- <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
+ <name>hadoop.serializations</name>
+ <value>org.apache.hadoop.io.serial.lib.WritableSerialization,org.apache.hadoop.io.serial.lib.protobuf.ProtoBufSerialization,org.apache.hadoop.io.serial.lib.thrift.ThriftSerialization,org.apache.hadoop.io.serial.lib.avro.AvroSerialization,org.apache.hadoop.io.serial.lib.CompatibilitySerialization</value>
<description>A list of serialization classes that can be used for
obtaining serializers and deserializers.</description>
</property>
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java Sat Dec 4 07:13:10 2010
@@ -138,9 +138,11 @@ public class CommonConfigurationKeysPubl
public static final String IO_SORT_FACTOR_KEY = "io.sort.factor";
/** Default value for IO_SORT_FACTOR_DEFAULT */
public static final int IO_SORT_FACTOR_DEFAULT = 100;
- /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
+ /** Defines the list of the deprecated serializations. */
public static final String IO_SERIALIZATIONS_KEY = "io.serializations";
-
+ /** Defines the list of serializations */
+ public static final String HADOOP_SERIALIZATIONS_KEY = "hadoop.serializations";
+
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String TFILE_IO_CHUNK_SIZE_KEY = "tfile.io.chunk.size";
/** Default value for TFILE_IO_CHUNK_SIZE_DEFAULT */
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/ArrayFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/ArrayFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/ArrayFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/ArrayFile.java Sat Dec 4 07:13:10 2010
@@ -40,7 +40,7 @@ public class ArrayFile extends MapFile {
/** Create the named file for values of the named class. */
public Writer(Configuration conf, FileSystem fs,
- String file, Class<? extends Writable> valClass)
+ String file, Class<?> valClass)
throws IOException {
super(conf, new Path(file), keyClass(LongWritable.class),
valueClass(valClass));
@@ -48,7 +48,7 @@ public class ArrayFile extends MapFile {
/** Create the named file for values of the named class. */
public Writer(Configuration conf, FileSystem fs,
- String file, Class<? extends Writable> valClass,
+ String file, Class<?> valClass,
CompressionType compress, Progressable progress)
throws IOException {
super(conf, new Path(file),
@@ -59,7 +59,7 @@ public class ArrayFile extends MapFile {
}
/** Append a value to the file. */
- public synchronized void append(Writable value) throws IOException {
+ public synchronized void append(Object value) throws IOException {
super.append(count, value); // add to map
count.set(count.get()+1); // increment count
}
@@ -81,20 +81,31 @@ public class ArrayFile extends MapFile {
seek(key);
}
- /** Read and return the next value in the file. */
+ @Deprecated
public synchronized Writable next(Writable value) throws IOException {
- return next(key, value) ? value : null;
+ return (Writable) next((Object) value);
+ }
+
+ /** Read and return the next value in the file. */
+ public synchronized Object next(Object value) throws IOException {
+ key = (LongWritable) nextKey(key);
+ return key == null? null : getCurrentValue(value);
}
/** Returns the key associated with the most recent call to {@link
- * #seek(long)}, {@link #next(Writable)}, or {@link
- * #get(long,Writable)}. */
+ * #seek(long)}, {@link #next(Object)}, or {@link
+ * #get(long,Object)}. */
public synchronized long key() throws IOException {
return key.get();
}
+ @Deprecated
+ public synchronized Writable get(long n, Writable value) throws IOException{
+ return (Writable) get(n, (Object) value);
+ }
+
/** Return the <code>n</code>th value in the file. */
- public synchronized Writable get(long n, Writable value)
+ public synchronized Object get(long n, Object value)
throws IOException {
key.set(n);
return get(key, value);
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/BloomMapFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/BloomMapFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/BloomMapFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/BloomMapFile.java Sat Dec 4 07:13:10 2010
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.util.Options;
+import org.apache.hadoop.io.serial.Serialization;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.bloom.DynamicBloomFilter;
import org.apache.hadoop.util.bloom.Filter;
@@ -42,7 +42,7 @@ import org.apache.hadoop.util.hash.Hash;
* This class extends {@link MapFile} and provides very much the same
* functionality. However, it uses dynamic Bloom filters to provide
* quick membership test for keys, and it offers a fast version of
- * {@link Reader#get(WritableComparable, Writable)} operation, especially in
+ * {@link Reader#get(Object, Object)} operation, especially in
* case of sparsely populated MapFile-s.
*/
@InterfaceAudience.Public
@@ -82,7 +82,9 @@ public class BloomMapFile {
private DataOutputBuffer buf = new DataOutputBuffer();
private FileSystem fs;
private Path dir;
+ private final Serialization<Object> keySerialization;
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass,
@@ -92,6 +94,7 @@ public class BloomMapFile {
compression(compress, codec), progressable(progress));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass,
@@ -101,6 +104,7 @@ public class BloomMapFile {
compression(compress), progressable(progress));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass,
@@ -110,6 +114,7 @@ public class BloomMapFile {
compression(compress));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
@@ -120,6 +125,7 @@ public class BloomMapFile {
progressable(progress));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
@@ -129,6 +135,7 @@ public class BloomMapFile {
progressable(progress));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass, CompressionType compress)
@@ -137,6 +144,7 @@ public class BloomMapFile {
valueClass(valClass), compression(compress));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass) throws IOException {
@@ -144,6 +152,7 @@ public class BloomMapFile {
valueClass(valClass));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass,
@@ -151,12 +160,14 @@ public class BloomMapFile {
this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
}
+ @SuppressWarnings("unchecked")
public Writer(Configuration conf, Path dir,
SequenceFile.Writer.Option... options) throws IOException {
super(conf, dir, options);
this.fs = dir.getFileSystem(conf);
this.dir = dir;
initBloomFilter(conf);
+ keySerialization = (Serialization<Object>) getKeySerialization();
}
private synchronized void initBloomFilter(Configuration conf) {
@@ -174,11 +185,10 @@ public class BloomMapFile {
}
@Override
- public synchronized void append(WritableComparable key, Writable val)
- throws IOException {
+ public synchronized void append(Object key, Object val) throws IOException {
super.append(key, val);
buf.reset();
- key.write(buf);
+ keySerialization.serialize(buf, key);
bloomKey.set(byteArrayForBloomKey(buf), 1.0);
bloomFilter.add(bloomKey);
}
@@ -198,11 +208,14 @@ public class BloomMapFile {
private DynamicBloomFilter bloomFilter;
private DataOutputBuffer buf = new DataOutputBuffer();
private Key bloomKey = new Key();
+ private final Serialization<Object> keySerialization;
+ @SuppressWarnings("unchecked")
public Reader(Path dir, Configuration conf,
SequenceFile.Reader.Option... options) throws IOException {
super(dir, conf, options);
initBloomFilter(dir, conf);
+ keySerialization = (Serialization<Object>) getKeySerialization();
}
@Deprecated
@@ -245,26 +258,40 @@ public class BloomMapFile {
* @return false iff key doesn't exist, true if key probably exists.
* @throws IOException
*/
- public boolean probablyHasKey(WritableComparable key) throws IOException {
+ public boolean probablyHasKey(Object key) throws IOException {
if (bloomFilter == null) {
return true;
}
buf.reset();
- key.write(buf);
+ keySerialization.serialize(buf, key);
bloomKey.set(byteArrayForBloomKey(buf), 1.0);
return bloomFilter.membershipTest(bloomKey);
}
/**
* Fast version of the
- * {@link MapFile.Reader#get(WritableComparable, Writable)} method. First
+ * {@link MapFile.Reader#get(Object, Object)} method. First
* it checks the Bloom filter for the existence of the key, and only if
* present it performs the real get operation. This yields significant
* performance improvements for get operations on sparsely populated files.
*/
+ @SuppressWarnings("unchecked")
+ @Deprecated
@Override
- public synchronized Writable get(WritableComparable key, Writable val)
- throws IOException {
+ public synchronized Writable get(WritableComparable key,
+ Writable value) throws IOException {
+ return (Writable) get((Object) key, (Object) value);
+ }
+
+ /**
+ * Fast version of the
+ * {@link MapFile.Reader#get(Object, Object)} method. First
+ * it checks the Bloom filter for the existence of the key, and only if
+ * present it performs the real get operation. This yields significant
+ * performance improvements for get operations on sparsely populated files.
+ */
+ @Override
+ public synchronized Object get(Object key, Object val) throws IOException {
if (!probablyHasKey(key)) {
return null;
}
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DataInputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DataInputBuffer.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DataInputBuffer.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DataInputBuffer.java Sat Dec 4 07:13:10 2010
@@ -93,4 +93,20 @@ public class DataInputBuffer extends Dat
/** Returns the length of the input. */
public int getLength() { return buffer.getLength(); }
+ public String toString() {
+ StringBuilder sb = new StringBuilder(3 * buffer.getLength() + 10);
+ byte[] bytes = getData();
+ for(int i=0; i < buffer.getLength(); i++) {
+ sb.append(' ');
+ String num = Integer.toHexString(0xff & bytes[i]);
+ // if it is only one digit, add a leading 0.
+ if (num.length() < 2) {
+ sb.append('0');
+ }
+ sb.append(num);
+ }
+ sb.append("; pos=");
+ sb.append(buffer.getPosition());
+ return sb.toString();
+ }
}
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DefaultStringifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DefaultStringifier.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DefaultStringifier.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/DefaultStringifier.java Sat Dec 4 07:13:10 2010
@@ -26,17 +26,15 @@ import org.apache.commons.codec.binary.B
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serial.Serialization;
+import org.apache.hadoop.io.serial.SerializationFactory;
import org.apache.hadoop.util.GenericsUtil;
/**
* DefaultStringifier is the default implementation of the {@link Stringifier}
* interface which stringifies the objects using base64 encoding of the
- * serialized version of the objects. The {@link Serializer} and
- * {@link Deserializer} are obtained from the {@link SerializationFactory}.
+ * serialized version of the objects. The {@link Serialization}
+ * is obtained from the {@link SerializationFactory}.
* <br>
* DefaultStringifier offers convenience methods to store/load objects to/from
* the configuration.
@@ -49,43 +47,37 @@ public class DefaultStringifier<T> imple
private static final String SEPARATOR = ",";
- private Serializer<T> serializer;
+ private final Serialization<T> serialization;
- private Deserializer<T> deserializer;
+ private final DataInputBuffer inBuf;
- private DataInputBuffer inBuf;
-
- private DataOutputBuffer outBuf;
+ private final DataOutputBuffer outBuf;
+ private final Configuration conf;
+ @SuppressWarnings("unchecked")
public DefaultStringifier(Configuration conf, Class<T> c) {
- SerializationFactory factory = new SerializationFactory(conf);
- this.serializer = factory.getSerializer(c);
- this.deserializer = factory.getDeserializer(c);
+ SerializationFactory factory = SerializationFactory.getInstance(conf);
this.inBuf = new DataInputBuffer();
this.outBuf = new DataOutputBuffer();
- try {
- serializer.open(outBuf);
- deserializer.open(inBuf);
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
+ this.conf = conf;
+ this.serialization = (Serialization<T>) factory.getSerializationByType(c);
}
public T fromString(String str) throws IOException {
try {
byte[] bytes = Base64.decodeBase64(str.getBytes("UTF-8"));
inBuf.reset(bytes, bytes.length);
- T restored = deserializer.deserialize(null);
+ T restored = serialization.deserialize(inBuf, null, conf);
return restored;
} catch (UnsupportedCharsetException ex) {
- throw new IOException(ex.toString());
+ throw new IOException("problem finding utf-8", ex);
}
}
public String toString(T obj) throws IOException {
outBuf.reset();
- serializer.serialize(obj);
+ serialization.serialize(outBuf, obj);
byte[] buf = new byte[outBuf.getLength()];
System.arraycopy(outBuf.getData(), 0, buf, 0, buf.length);
return new String(Base64.encodeBase64(buf));
@@ -94,8 +86,6 @@ public class DefaultStringifier<T> imple
public void close() throws IOException {
inBuf.close();
outBuf.close();
- deserializer.close();
- serializer.close();
}
/**
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/MapFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/MapFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/MapFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/MapFile.java Sat Dec 4 07:13:10 2010
@@ -18,24 +18,25 @@
package org.apache.hadoop.io;
+import java.io.EOFException;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.io.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.Options;
-import org.apache.hadoop.fs.*;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.serial.Serialization;
+import org.apache.hadoop.io.serial.SerializationFactory;
+import org.apache.hadoop.util.Progressable;
/** A file-based map from keys to values.
*
@@ -68,8 +69,11 @@ public class MapFile {
/** Writes a new map. */
public static class Writer implements java.io.Closeable {
- private SequenceFile.Writer data;
- private SequenceFile.Writer index;
+ private final SequenceFile.Writer data;
+ private final SequenceFile.Writer index;
+ private final Configuration conf;
+ private final Serialization<Object> keySerialization;
+ private final Serialization<Object> valueSerialization;
final private static String INDEX_INTERVAL = "io.map.index.interval";
private int indexInterval = 128;
@@ -78,10 +82,11 @@ public class MapFile {
private LongWritable position = new LongWritable();
// the following fields are used only for checking key order
- private WritableComparator comparator;
- private DataInputBuffer inBuf = new DataInputBuffer();
- private DataOutputBuffer outBuf = new DataOutputBuffer();
- private WritableComparable lastKey;
+ private final RawComparator comparator;
+ private final DataInputBuffer inBuf = new DataInputBuffer();
+ private DataOutputBuffer lastKey;
+ private final DataOutputBuffer currentKey = new DataOutputBuffer();
+ private final DataOutputBuffer currentValue = new DataOutputBuffer();
/** What's the position (in bytes) we wrote when we got the last index */
private long lastIndexPos = -1;
@@ -97,6 +102,7 @@ public class MapFile {
/** Create the named map for keys of the named class.
* @deprecated Use Writer(Configuration, Path, Option...) instead.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass,
@@ -107,6 +113,7 @@ public class MapFile {
/** Create the named map for keys of the named class.
* @deprecated Use Writer(Configuration, Path, Option...) instead.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass,
@@ -119,6 +126,7 @@ public class MapFile {
/** Create the named map for keys of the named class.
* @deprecated Use Writer(Configuration, Path, Option...) instead.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass,
@@ -131,6 +139,7 @@ public class MapFile {
/** Create the named map for keys of the named class.
* @deprecated Use Writer(Configuration, Path, Option...) instead.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass,
@@ -142,6 +151,7 @@ public class MapFile {
/** Create the named map using the named key comparator.
* @deprecated Use Writer(Configuration, Path, Option...) instead.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass
@@ -153,6 +163,7 @@ public class MapFile {
/** Create the named map using the named key comparator.
* @deprecated Use Writer(Configuration, Path, Option...) instead.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
@@ -164,6 +175,7 @@ public class MapFile {
/** Create the named map using the named key comparator.
* @deprecated Use Writer(Configuration, Path, Option...)} instead.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
@@ -177,6 +189,7 @@ public class MapFile {
/** Create the named map using the named key comparator.
* @deprecated Use Writer(Configuration, Path, Option...) instead.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
@@ -190,28 +203,18 @@ public class MapFile {
// our options are a superset of sequence file writer options
public static interface Option extends SequenceFile.Writer.Option { }
- private static class KeyClassOption extends Options.ClassOption
- implements Option {
- KeyClassOption(Class<?> value) {
+ private static class ComparatorOption extends Options.ComparatorOption
+ implements Option{
+ ComparatorOption(RawComparator value) {
super(value);
}
}
-
- private static class ComparatorOption implements Option {
- private final WritableComparator value;
- ComparatorOption(WritableComparator value) {
- this.value = value;
- }
- WritableComparator getValue() {
- return value;
- }
- }
- public static Option keyClass(Class<? extends WritableComparable> value) {
- return new KeyClassOption(value);
+ public static SequenceFile.Writer.Option keyClass(Class<?> value) {
+ return new SequenceFile.Writer.KeyClassOption(value);
}
- public static Option comparator(WritableComparator value) {
+ public static Option comparator(RawComparator value) {
return new ComparatorOption(value);
}
@@ -234,31 +237,27 @@ public class MapFile {
return SequenceFile.Writer.progressable(value);
}
+ public static
+ SequenceFile.Writer.Option keySerialization(Serialization<?> value) {
+ return SequenceFile.Writer.keySerialization(value);
+ }
+
+ public static
+ SequenceFile.Writer.Option valueSerialization(Serialization<?> value) {
+ return SequenceFile.Writer.valueSerialization(value);
+ }
+
@SuppressWarnings("unchecked")
public Writer(Configuration conf,
Path dirName,
SequenceFile.Writer.Option... opts
) throws IOException {
- KeyClassOption keyClassOption =
- Options.getOption(KeyClassOption.class, opts);
+ this.conf = conf;
ComparatorOption comparatorOption =
Options.getOption(ComparatorOption.class, opts);
- if ((keyClassOption == null) == (comparatorOption == null)) {
- throw new IllegalArgumentException("key class or comparator option "
- + "must be set");
- }
+
this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
- Class<? extends WritableComparable> keyClass;
- if (keyClassOption == null) {
- this.comparator = comparatorOption.getValue();
- keyClass = comparator.getKeyClass();
- } else {
- keyClass=
- (Class<? extends WritableComparable>) keyClassOption.getValue();
- this.comparator = WritableComparator.get(keyClass);
- }
- this.lastKey = comparator.newKey();
FileSystem fs = dirName.getFileSystem(conf);
if (!fs.mkdirs(dirName)) {
@@ -269,13 +268,18 @@ public class MapFile {
SequenceFile.Writer.Option[] dataOptions =
Options.prependOptions(opts,
- SequenceFile.Writer.file(dataFile),
- SequenceFile.Writer.keyClass(keyClass));
+ SequenceFile.Writer.file(dataFile));
this.data = SequenceFile.createWriter(conf, dataOptions);
+ keySerialization = (Serialization<Object>) data.getKeySerialization();
+ valueSerialization = (Serialization<Object>) data.getValueSerialization();
+ if (comparatorOption != null) {
+ comparator = comparatorOption.getValue();
+ } else {
+ comparator = keySerialization.getRawComparator();
+ }
SequenceFile.Writer.Option[] indexOptions =
Options.prependOptions(opts, SequenceFile.Writer.file(indexFile),
- SequenceFile.Writer.keyClass(keyClass),
SequenceFile.Writer.valueClass(LongWritable.class),
SequenceFile.Writer.compression(CompressionType.BLOCK));
this.index = SequenceFile.createWriter(conf, indexOptions);
@@ -296,6 +300,22 @@ public class MapFile {
conf.setInt(INDEX_INTERVAL, interval);
}
+ /**
+ * Get the serialization used for the keys
+ * @return the key serialization
+ */
+ public Serialization<?> getKeySerialization() {
+ return data.getKeySerialization();
+ }
+
+ /**
+ * Get the serialization used for the values
+ * @return the value serialization
+ */
+ public Serialization<?> getValueSerialization() {
+ return data.getValueSerialization();
+ }
+
/** Close the map. */
public synchronized void close() throws IOException {
data.close();
@@ -304,10 +324,14 @@ public class MapFile {
/** Append a key/value pair to the map. The key must be greater or equal
* to the previous key added to the map. */
- public synchronized void append(WritableComparable key, Writable val)
+ public synchronized void append(Object key, Object val)
throws IOException {
- checkKey(key);
+ currentKey.reset();
+ keySerialization.serialize(currentKey, key);
+ checkKey(currentKey, key);
+ currentValue.reset();
+ valueSerialization.serialize(currentValue, val);
long pos = data.getLength();
// Only write an index if we've changed positions. In a block compressed
@@ -323,17 +347,21 @@ public class MapFile {
size++;
}
- private void checkKey(WritableComparable key) throws IOException {
+ private void checkKey(DataOutputBuffer serialKey, Object key
+ ) throws IOException {
// check that keys are well-ordered
- if (size != 0 && comparator.compare(lastKey, key) > 0)
- throw new IOException("key out of order: "+key+" after "+lastKey);
-
- // update lastKey with a copy of key by writing and reading
- outBuf.reset();
- key.write(outBuf); // write new key
-
- inBuf.reset(outBuf.getData(), outBuf.getLength());
- lastKey.readFields(inBuf); // read into lastKey
+ if (lastKey == null) {
+ lastKey = new DataOutputBuffer();
+ } else if (comparator.compare(lastKey.getData(), 0, lastKey.getLength(),
+ serialKey.getData(),0,serialKey.getLength())
+ > 0) {
+ // rebuild the previous key so that we can explain what's wrong
+ inBuf.reset(lastKey.getData(), 0, lastKey.getLength());
+ Object prevKey = keySerialization.deserialize(inBuf, null, conf);
+ throw new IOException("key out of order: "+ key +" after "+ prevKey);
+ }
+ lastKey.reset();
+ lastKey.write(serialKey.getData(), 0, serialKey.getLength());
}
}
@@ -346,9 +374,12 @@ public class MapFile {
* files using less memory. */
private int INDEX_SKIP = 0;
- private WritableComparator comparator;
+ private RawComparator comparator;
+ private Serialization<Object> keySerialization;
+ private final Configuration conf;
- private WritableComparable nextKey;
+ private DataOutputBuffer nextKey = new DataOutputBuffer();
+ private DataInputBuffer inBuf = new DataInputBuffer();
private long seekPosition = -1;
private int seekIndex = -1;
private long firstPosition;
@@ -362,36 +393,55 @@ public class MapFile {
// the index, in memory
private int count = -1;
- private WritableComparable[] keys;
+ private byte[][] keys;
private long[] positions;
- /** Returns the class of keys in this file. */
+ /** Returns the class of keys in this file.
+ * @deprecated Use {@link #getKeySerialization} instead.
+ */
+ @Deprecated
public Class<?> getKeyClass() { return data.getKeyClass(); }
- /** Returns the class of values in this file. */
+ /** Returns the class of values in this file.
+ * @deprecated Use {@link #getValueSerialization} instead.
+ */
+ @Deprecated
public Class<?> getValueClass() { return data.getValueClass(); }
+ /**
+ * Get the key serialization for this map file.
+ * @return the serialization for the key
+ */
+ public Serialization<?> getKeySerialization() {
+ return keySerialization;
+ }
+
+ /**
+ * Get the value serialization for this map file.
+ * @return the serialization for the value
+ */
+ public Serialization<?> getValueSerialization() {
+ return data.getValueSerialization();
+ }
public static interface Option extends SequenceFile.Reader.Option {}
public static Option comparator(WritableComparator value) {
return new ComparatorOption(value);
}
- static class ComparatorOption implements Option {
- private final WritableComparator value;
- ComparatorOption(WritableComparator value) {
- this.value = value;
- }
- WritableComparator getValue() {
- return value;
+ static class ComparatorOption extends Options.ComparatorOption
+ implements Option {
+ ComparatorOption(RawComparator value) {
+ super(value);
}
}
public Reader(Path dir, Configuration conf,
SequenceFile.Reader.Option... opts) throws IOException {
+ this.conf = conf;
ComparatorOption comparatorOption =
Options.getOption(ComparatorOption.class, opts);
- WritableComparator comparator =
+ RawComparator comparator =
comparatorOption == null ? null : comparatorOption.getValue();
INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
open(dir, comparator, conf, opts);
@@ -415,8 +465,9 @@ public class MapFile {
this(new Path(dirName), conf, comparator(comparator));
}
+ @SuppressWarnings("unchecked")
protected synchronized void open(Path dir,
- WritableComparator comparator,
+ RawComparator comparator,
Configuration conf,
SequenceFile.Reader.Option... options
) throws IOException {
@@ -426,13 +477,13 @@ public class MapFile {
// open the data
this.data = createDataFileReader(dataFile, conf, options);
this.firstPosition = data.getPosition();
+ keySerialization = (Serialization<Object>) data.getKeySerialization();
- if (comparator == null)
- this.comparator =
- WritableComparator.get(data.getKeyClass().
- asSubclass(WritableComparable.class));
- else
+ if (comparator == null) {
+ this.comparator = keySerialization.getRawComparator();
+ } else {
this.comparator = comparator;
+ }
// open the index
SequenceFile.Reader.Option[] indexOptions =
@@ -463,19 +514,25 @@ public class MapFile {
try {
int skip = INDEX_SKIP;
LongWritable position = new LongWritable();
- WritableComparable lastKey = null;
+ byte[] lastKey = null;
long lastIndex = -1;
- ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024);
- while (true) {
- WritableComparable k = comparator.newKey();
-
- if (!index.next(k, position))
- break;
+ ArrayList<byte[]> keyBuilder = new ArrayList<byte[]>(1024);
+ DataOutputBuffer key = new DataOutputBuffer();
+ while (index.nextRawKey(key) > 0) {
+ position = (LongWritable) index.getCurrentValue(position);
// check order to make sure comparator is compatible
- if (lastKey != null && comparator.compare(lastKey, k) > 0)
- throw new IOException("key out of order: "+k+" after "+lastKey);
- lastKey = k;
+ if (lastKey != null &&
+ comparator.compare(lastKey, 0, lastKey.length,
+ key.getData(), 0 , key.getLength()) > 0) {
+ inBuf.reset(lastKey, 0, lastKey.length);
+ Object prevKey = keySerialization.deserialize(inBuf, null, conf);
+ inBuf.reset(key.getData(), 0, key.getLength());
+ Object curKey = keySerialization.deserialize(inBuf, null, conf);
+ throw new IOException("key out of order: "+ curKey + " after " +
+ prevKey);
+ }
+ lastKey = Arrays.copyOf(key.getData(), key.getLength());
if (skip > 0) {
skip--;
continue; // skip this entry
@@ -483,28 +540,28 @@ public class MapFile {
skip = INDEX_SKIP; // reset skip
}
- // don't read an index that is the same as the previous one. Block
- // compressed map files used to do this (multiple entries would point
- // at the same block)
- if (position.get() == lastIndex)
- continue;
+ // don't read an index that is the same as the previous one. Block
+ // compressed map files used to do this (multiple entries would point
+ // at the same block)
+ if (position.get() == lastIndex)
+ continue;
if (count == positions.length) {
- positions = Arrays.copyOf(positions, positions.length * 2);
+ positions = Arrays.copyOf(positions, positions.length * 2);
}
- keyBuilder.add(k);
+ keyBuilder.add(lastKey);
positions[count] = position.get();
count++;
}
- this.keys = keyBuilder.toArray(new WritableComparable[count]);
+ this.keys = keyBuilder.toArray(new byte[count][]);
positions = Arrays.copyOf(positions, count);
} catch (EOFException e) {
LOG.warn("Unexpected EOF reading " + index +
- " at entry #" + count + ". Ignoring.");
+ " at entry #" + count + ". Ignoring.");
} finally {
- indexClosed = true;
+ indexClosed = true;
index.close();
}
}
@@ -517,22 +574,23 @@ public class MapFile {
/** Get the key at approximately the middle of the file. Or null if the
* file is empty.
*/
- public synchronized WritableComparable midKey() throws IOException {
+ public synchronized Object midKey() throws IOException {
readIndex();
if (count == 0) {
return null;
}
- return keys[(count - 1) / 2];
+ byte[] rawKey = keys[(count -1) / 2];
+ inBuf.reset(rawKey, 0, rawKey.length);
+ return keySerialization.deserialize(inBuf, null, conf);
}
/** Reads the final key from the file.
*
* @param key key to read into
*/
- public synchronized void finalKey(WritableComparable key)
- throws IOException {
+ public synchronized Object finalKey(Object key) throws IOException {
long originalPosition = data.getPosition(); // save position
try {
@@ -542,8 +600,12 @@ public class MapFile {
} else {
reset(); // start at the beginning
}
- while (data.next(key)) {} // scan to eof
-
+ Object prevKey = null;
+ do {
+ prevKey = key;
+ key = data.nextKey(key);
+ } while (key != null);
+ return prevKey;
} finally {
data.seek(originalPosition); // restore position
}
@@ -553,7 +615,7 @@ public class MapFile {
* first entry after the named key. Returns true iff the named key exists
* in this map.
*/
- public synchronized boolean seek(WritableComparable key) throws IOException {
+ public synchronized boolean seek(Object key) throws IOException {
return seekInternal(key) == 0;
}
@@ -565,7 +627,7 @@ public class MapFile {
* < 0 - positioned at next record
* 1 - no more records in file
*/
- private synchronized int seekInternal(WritableComparable key)
+ private synchronized int seekInternal(Object key)
throws IOException {
return seekInternal(key, false);
}
@@ -582,19 +644,24 @@ public class MapFile {
* < 0 - positioned at next record
* 1 - no more records in file
*/
- private synchronized int seekInternal(WritableComparable key,
- final boolean before)
- throws IOException {
+ private synchronized int seekInternal(Object key,
+ final boolean before
+ ) throws IOException {
readIndex(); // make sure index is read
+ DataOutputBuffer keyBuffer = new DataOutputBuffer();
+ keySerialization.serialize(keyBuffer, key);
if (seekIndex != -1 // seeked before
&& seekIndex+1 < count
- && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
- && comparator.compare(key, nextKey)
- >= 0) { // but after last seeked
+ && comparator.compare(keyBuffer.getData(), 0, keyBuffer.getLength(),
+ keys[seekIndex+1], 0, keys[seekIndex+1].length)
+ < 0 // before next indexed
+ && comparator.compare(keyBuffer.getData(), 0, keyBuffer.getLength(),
+ nextKey.getData(), 0, nextKey.getLength())
+ >= 0) { // but after last seeked
// do nothing
} else {
- seekIndex = binarySearch(key);
+ seekIndex = binarySearch(keyBuffer.getData(), keyBuffer.getLength());
if (seekIndex < 0) // decode insertion point
seekIndex = -seekIndex-2;
@@ -605,17 +672,15 @@ public class MapFile {
}
data.seek(seekPosition);
- if (nextKey == null)
- nextKey = comparator.newKey();
-
// If we're looking for the key before, we need to keep track
// of the position we got the current key as well as the position
// of the key before it.
long prevPosition = -1;
long curPosition = seekPosition;
- while (data.next(nextKey)) {
- int c = comparator.compare(key, nextKey);
+ while (data.nextRawKey(nextKey) != -1) {
+ int c = comparator.compare(keyBuffer.getData(), 0, keyBuffer.getLength(),
+ nextKey.getData(), 0 , nextKey.getLength());
if (c <= 0) { // at or beyond desired
if (before && c != 0) {
if (prevPosition == -1) {
@@ -627,7 +692,7 @@ public class MapFile {
} else {
// We have a previous record to back up to
data.seek(prevPosition);
- data.next(nextKey);
+ data.nextRawKey(nextKey);
// now that we've rewound, the search key must be greater than this key
return 1;
}
@@ -639,18 +704,24 @@ public class MapFile {
curPosition = data.getPosition();
}
}
-
+ // if we have fallen off the end of the file and we want the before key
+ // then back up to the previous key
+ if (before && prevPosition != -1) {
+ data.seek(prevPosition);
+ data.nextRawKey(nextKey);
+ }
return 1;
}
- private int binarySearch(WritableComparable key) {
+ private int binarySearch(byte[] key, int length) {
int low = 0;
int high = count-1;
while (low <= high) {
int mid = (low + high) >>> 1;
- WritableComparable midVal = keys[mid];
- int cmp = comparator.compare(midVal, key);
+ byte[] midVal = keys[mid];
+ int cmp = comparator.compare(midVal, 0, midVal.length,
+ key, 0, length);
if (cmp < 0)
low = mid + 1;
@@ -664,18 +735,59 @@ public class MapFile {
/** Read the next key/value pair in the map into <code>key</code> and
* <code>val</code>. Returns true if such a pair exists and false when at
- * the end of the map */
+ * the end of the map
+ * @deprecated Use {@link #nextKey} and {@link #getCurrentValue} instead.
+ */
+ @SuppressWarnings("unchecked")
+ @Deprecated
public synchronized boolean next(WritableComparable key, Writable val)
throws IOException {
return data.next(key, val);
}
+
+ /**
+ * Read the next key in the map.
+ * @param reusable an object that may be re-used for holding the next key
+ * @return the key that was read or null if there is not another key
+ * @throws IOException
+ */
+ public Object nextKey(Object reusable) throws IOException {
+ return data.nextKey(reusable);
+ }
+
+ /**
+ * Get the current value in the map.
+ * @param reusable an object that may be re-used for hold the value
+ * @return the value that was read in
+ * @throws IOException
+ */
+ public Object getCurrentValue(Object reusable) throws IOException {
+ return data.getCurrentValue(reusable);
+ }
+
+ /**
+ * Return the value for the named key, or null if none exists.
+ * @param key the key to look for
+ * @param value a object to read into
+ * @return the value that was found or null if the key wasn't found
+ * @throws IOException
+ * @deprecated Use {@link #seek} and {@link #getCurrentValue} instead.
+ */
+ @SuppressWarnings("unchecked")
+ @Deprecated
+ public synchronized Writable get(WritableComparable key,
+ Writable value) throws IOException {
+ if (seek(key)) {
+ return (Writable) data.getCurrentValue(value);
+ } else {
+ return null;
+ }
+ }
/** Return the value for the named key, or null if none exists. */
- public synchronized Writable get(WritableComparable key, Writable val)
- throws IOException {
+ public synchronized Object get(Object key, Object val) throws IOException{
if (seek(key)) {
- data.getCurrentValue(val);
- return val;
+ return data.getCurrentValue(val);
} else
return null;
}
@@ -689,9 +801,8 @@ public class MapFile {
- * @param val - data value if key is found
- * @return - the key that was the closest match or null if eof.
*/
- public synchronized WritableComparable getClosest(WritableComparable key,
- Writable val)
- throws IOException {
+ public Object getClosest(Object key,
+ Object val) throws IOException {
return getClosest(key, val, false);
}
@@ -705,9 +816,10 @@ public class MapFile {
* return the record that sorts just after.
* @return - the key that was the closest match or null if eof.
*/
- public synchronized WritableComparable getClosest(WritableComparable key,
- Writable val, final boolean before)
- throws IOException {
+ public synchronized Object getClosest(Object key,
+ Object val,
+ final boolean before
+ ) throws IOException {
int c = seekInternal(key, before);
@@ -720,7 +832,9 @@ public class MapFile {
}
data.getCurrentValue(val);
- return nextKey;
+ // deserialize the key
+ inBuf.reset(nextKey.getData(), 0, nextKey.getLength());
+ return keySerialization.deserialize(inBuf, null, conf);
}
/** Close the map. */
@@ -764,17 +878,24 @@ public class MapFile {
* @return number of valid entries in this MapFile, or -1 if no fixing was needed
* @throws Exception
*/
+ @SuppressWarnings("unchecked")
public static long fix(FileSystem fs, Path dir,
- Class<? extends Writable> keyClass,
- Class<? extends Writable> valueClass, boolean dryrun,
- Configuration conf) throws Exception {
+ Class<?> keyClass,
+ Class<?> valueClass, boolean dryrun,
+ Configuration conf) throws IOException {
String dr = (dryrun ? "[DRY RUN ] " : "");
Path data = new Path(dir, DATA_FILE_NAME);
Path index = new Path(dir, INDEX_FILE_NAME);
int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128);
+ SerializationFactory factory = SerializationFactory.getInstance(conf);
+ Serialization<Object> keySerialization = (Serialization<Object>)
+ factory.getSerializationByType(keyClass);
+ Serialization<Object> valueSerialization = (Serialization<Object>)
+ factory.getSerializationByType(valueClass);
if (!fs.exists(data)) {
// there's nothing we can do to fix this!
- throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
+ throw new IOException(dr + "Missing data file in " + dir +
+ ", impossible to fix this.");
}
if (fs.exists(index)) {
// no fixing needed
@@ -782,17 +903,17 @@ public class MapFile {
}
SequenceFile.Reader dataReader =
new SequenceFile.Reader(conf, SequenceFile.Reader.file(data));
- if (!dataReader.getKeyClass().equals(keyClass)) {
- throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
- ", got " + dataReader.getKeyClass().getName());
- }
- if (!dataReader.getValueClass().equals(valueClass)) {
- throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
- ", got " + dataReader.getValueClass().getName());
+ if (!dataReader.getKeySerialization().equals(keySerialization)) {
+ throw new IOException(dr + "Wrong key serialization in " + dir +
+ ", expected" + keySerialization +
+ ", got " + dataReader.getKeySerialization());
+ }
+ if (!dataReader.getValueSerialization().equals(valueSerialization)) {
+ throw new IOException(dr + "Wrong value serialization in " + dir +
+ ", expected" + valueSerialization +
+ ", got " + dataReader.getValueSerialization());
}
long cnt = 0L;
- Writable key = ReflectionUtils.newInstance(keyClass, conf);
- Writable value = ReflectionUtils.newInstance(valueClass, conf);
SequenceFile.Writer indexWriter = null;
if (!dryrun) {
indexWriter =
@@ -805,7 +926,10 @@ public class MapFile {
try {
long pos = 0L;
LongWritable position = new LongWritable();
- while(dataReader.next(key, value)) {
+ Object key = null;
+ Object value = null;
+ while((key = dataReader.nextKey(key)) != null) {
+ value = dataReader.getCurrentValue(value);
cnt++;
if (cnt % indexInterval == 0) {
position.set(pos);
@@ -834,21 +958,21 @@ public class MapFile {
String out = args[1];
Configuration conf = new Configuration();
- FileSystem fs = FileSystem.getLocal(conf);
- MapFile.Reader reader = new MapFile.Reader(fs, in, conf);
+ MapFile.Reader reader = new MapFile.Reader(new Path(in), conf);
+ Serialization<?> keySerialization = reader.getKeySerialization();
+ Serialization<?> valueSerialization = reader.getValueSerialization();
MapFile.Writer writer =
- new MapFile.Writer(conf, fs, out,
- reader.getKeyClass().asSubclass(WritableComparable.class),
- reader.getValueClass());
-
- WritableComparable key =
- ReflectionUtils.newInstance(reader.getKeyClass().asSubclass(WritableComparable.class), conf);
- Writable value =
- ReflectionUtils.newInstance(reader.getValueClass().asSubclass(Writable.class), conf);
+ new MapFile.Writer(conf, new Path(out),
+ Writer.keySerialization(keySerialization),
+ Writer.valueSerialization(valueSerialization));
- while (reader.next(key, value)) // copy all entries
- writer.append(key, value);
+ Object key = null;
+ Object value = null;
+ while ((key = reader.nextKey(key)) != null) { // copy all entries
+ value = reader.getCurrentValue(value);
+ writer.append(key, value);
+ }
writer.close();
}
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/RawComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/RawComparator.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/RawComparator.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/RawComparator.java Sat Dec 4 07:13:10 2010
@@ -22,7 +22,6 @@ import java.util.Comparator;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.serializer.DeserializerComparator;
/**
* <p>
@@ -30,12 +29,15 @@ import org.apache.hadoop.io.serializer.D
* objects.
* </p>
* @param <T>
- * @see DeserializerComparator
+ * @deprecated Use {@link org.apache.hadoop.io.serial.RawComparator} instead.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public interface RawComparator<T> extends Comparator<T> {
+@Deprecated
+public interface RawComparator<T>
+ extends Comparator<T>, org.apache.hadoop.io.serial.RawComparator {
+ @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}