You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2010/12/04 08:13:12 UTC
svn commit: r1042107 [2/6] - in /hadoop/common/branches/HADOOP-6685: ./ ivy/
src/java/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/
src/java/org/apache/hadoop/io/file/tfile/
src/java/org/apache/hadoop/io/serial/ src/java/org/apache/had...
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java Sat Dec 4 07:13:10 2010
@@ -34,12 +34,14 @@ import org.apache.hadoop.io.compress.Dec
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.serial.Serialization;
+import org.apache.hadoop.io.serial.SerializationFactory;
+import org.apache.hadoop.io.serial.TypedSerialization;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
@@ -93,10 +95,16 @@ import org.apache.hadoop.util.PriorityQu
* version number (e.g. SEQ4 or SEQ6)
* </li>
* <li>
- * keyClassName -key class
+ * key serialization name
* </li>
* <li>
- * valueClassName - value class
+ * key serialization configuration
+ * </li>
+ * <li>
+ * value serialization name
+ * </li>
+ * <li>
+ * value serialization data
* </li>
* <li>
* compression - A boolean which specifies if compression is turned on for
@@ -134,7 +142,7 @@ import org.apache.hadoop.util.PriorityQu
* </ul>
* </li>
* <li>
- * A sync-marker every few <code>100</code> bytes or so.
+ * A sync-marker every <code>2000</code> bytes or so.
* </li>
* </ul>
*
@@ -153,7 +161,7 @@ import org.apache.hadoop.util.PriorityQu
* </ul>
* </li>
* <li>
- * A sync-marker every few <code>100</code> bytes or so.
+ * A sync-marker every <code>2000</code> bytes or so.
* </li>
* </ul>
*
@@ -165,6 +173,7 @@ import org.apache.hadoop.util.PriorityQu
* <li>
* Record <i>Block</i>
* <ul>
+ * <li>sync-marker</li>
* <li>Compressed key-lengths block-size</li>
* <li>Compressed key-lengths block</li>
* <li>Compressed keys block-size</li>
@@ -175,9 +184,6 @@ import org.apache.hadoop.util.PriorityQu
* <li>Compressed values block</li>
* </ul>
* </li>
- * <li>
- * A sync-marker every few <code>100</code> bytes or so.
- * </li>
* </ul>
*
* <p>The compressed blocks of key lengths and value lengths consist of the
@@ -196,8 +202,9 @@ public class SequenceFile {
private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
private static final byte VERSION_WITH_METADATA = (byte)6;
+ private static final byte SERIALIZATION_VERSION = (byte) 7;
private static byte[] VERSION = new byte[] {
- (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
+ (byte)'S', (byte)'E', (byte)'Q', SERIALIZATION_VERSION
};
private static final int SYNC_ESCAPE = -1; // "length" of sync entries
@@ -285,6 +292,7 @@ public class SequenceFile {
* instead.
*/
@Deprecated
+ @SuppressWarnings("unchecked")
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass) throws IOException {
@@ -306,6 +314,7 @@ public class SequenceFile {
* instead.
*/
@Deprecated
+ @SuppressWarnings("unchecked")
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass,
@@ -330,6 +339,7 @@ public class SequenceFile {
* instead.
*/
@Deprecated
+ @SuppressWarnings("unchecked")
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionType compressionType,
@@ -355,6 +365,7 @@ public class SequenceFile {
* instead.
*/
@Deprecated
+ @SuppressWarnings("unchecked")
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionType compressionType,
@@ -381,6 +392,7 @@ public class SequenceFile {
* instead.
*/
@Deprecated
+ @SuppressWarnings("unchecked")
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass,
@@ -413,6 +425,7 @@ public class SequenceFile {
* instead.
*/
@Deprecated
+ @SuppressWarnings("unchecked")
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, int bufferSize,
@@ -444,6 +457,7 @@ public class SequenceFile {
* @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
* instead.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
@@ -471,6 +485,7 @@ public class SequenceFile {
* instead.
*/
@Deprecated
+ @SuppressWarnings("unchecked")
public static Writer
createWriter(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass,
@@ -495,6 +510,7 @@ public class SequenceFile {
* @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
* instead.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
public static Writer
createWriter(Configuration conf, FSDataOutputStream out,
@@ -527,18 +543,44 @@ public class SequenceFile {
* Size of stored data.
*/
public int getSize();
+
+ }
+
+ /**
+ * Make an InputStream from a ValueBytes.
+ * @param bytes the bytes to provide as input
+ * @return a new input stream with the bytes
+ * @throws IOException
+ */
+ private static InputStream readUncompressedBytes(ValueBytes bytes
+ ) throws IOException {
+ DataInputBuffer result = new DataInputBuffer();
+ if (bytes instanceof UncompressedBytes) {
+ MutableValueBytes concrete = (MutableValueBytes) bytes;
+ result.reset(concrete.data, concrete.dataSize);
+ } else {
+ DataOutputBuffer outBuf = new DataOutputBuffer();
+ bytes.writeUncompressedBytes(outBuf);
+ result.reset(outBuf.getData(), outBuf.getLength());
+ }
+ return result;
}
+
- private static class UncompressedBytes implements ValueBytes {
- private int dataSize;
- private byte[] data;
-
- private UncompressedBytes() {
+ private static abstract class MutableValueBytes implements ValueBytes {
+ protected byte[] data;
+ protected int dataSize;
+
+ MutableValueBytes() {
data = null;
dataSize = 0;
}
+
+ public int getSize() {
+ return dataSize;
+ }
- private void reset(DataInputStream in, int length) throws IOException {
+ void reset(DataInputStream in, int length) throws IOException {
if (data == null) {
data = new byte[length];
} else if (length > data.length) {
@@ -548,10 +590,14 @@ public class SequenceFile {
in.readFully(data, 0, length);
dataSize = length;
}
-
- public int getSize() {
- return dataSize;
+
+ void set(MutableValueBytes other) {
+ data = other.data;
+ dataSize = other.dataSize;
}
+ }
+
+ private static class UncompressedBytes extends MutableValueBytes {
public void writeUncompressedBytes(DataOutputStream outStream)
throws IOException {
@@ -566,34 +612,15 @@ public class SequenceFile {
} // UncompressedBytes
- private static class CompressedBytes implements ValueBytes {
- private int dataSize;
- private byte[] data;
+ private static class CompressedBytes extends MutableValueBytes {
DataInputBuffer rawData = null;
CompressionCodec codec = null;
CompressionInputStream decompressedStream = null;
private CompressedBytes(CompressionCodec codec) {
- data = null;
- dataSize = 0;
this.codec = codec;
}
- private void reset(DataInputStream in, int length) throws IOException {
- if (data == null) {
- data = new byte[length];
- } else if (length > data.length) {
- data = new byte[Math.max(length, data.length * 2)];
- }
- dataSize = -1;
- in.readFully(data, 0, length);
- dataSize = length;
- }
-
- public int getSize() {
- return dataSize;
- }
-
public void writeUncompressedBytes(DataOutputStream outStream)
throws IOException {
if (decompressedStream == null) {
@@ -738,9 +765,6 @@ public class SequenceFile {
boolean ownOutputStream = true;
DataOutputBuffer buffer = new DataOutputBuffer();
- Class keyClass;
- Class valClass;
-
private final CompressionType compress;
CompressionCodec codec = null;
CompressionOutputStream deflateFilter = null;
@@ -748,9 +772,8 @@ public class SequenceFile {
Metadata metadata = null;
Compressor compressor = null;
- protected Serializer keySerializer;
- protected Serializer uncompressedValSerializer;
- protected Serializer compressedValSerializer;
+ protected Serialization<Object> keySerialization;
+ protected Serialization<Object> valueSerialization;
// Insert a globally unique 16-byte value every few entries, so that one
// can seek into the middle of a file and then synchronize with record
@@ -817,6 +840,20 @@ public class SequenceFile {
}
}
+ static class KeySerialization extends Options.SerializationOption
+ implements Option {
+ KeySerialization(Serialization<?> value) {
+ super(value);
+ }
+ }
+
+ static class ValueSerialization extends Options.SerializationOption
+ implements Option {
+ ValueSerialization(Serialization<?> value) {
+ super(value);
+ }
+ }
+
static class MetadataOption implements Option {
private final Metadata value;
MetadataOption(Metadata value) {
@@ -878,6 +915,14 @@ public class SequenceFile {
return new ProgressableOption(value);
}
+ public static Option keySerialization(Serialization<?> value) {
+ return new KeySerialization(value);
+ }
+
+ public static Option valueSerialization(Serialization<?> value) {
+ return new ValueSerialization(value);
+ }
+
public static Option keyClass(Class<?> value) {
return new KeyClassOption(value);
}
@@ -905,6 +950,7 @@ public class SequenceFile {
* @param options the options used when creating the writer
* @throws IOException if it fails
*/
+ @SuppressWarnings("unchecked")
Writer(Configuration conf,
Option... opts) throws IOException {
BlockSizeOption blockSizeOption =
@@ -917,6 +963,10 @@ public class SequenceFile {
Options.getOption(ProgressableOption.class, opts);
FileOption fileOption = Options.getOption(FileOption.class, opts);
StreamOption streamOption = Options.getOption(StreamOption.class, opts);
+ KeySerialization keySerializationOption =
+ Options.getOption(KeySerialization.class, opts);
+ ValueSerialization valueSerializationOption =
+ Options.getOption(ValueSerialization.class, opts);
KeyClassOption keyClassOption =
Options.getOption(KeyClassOption.class, opts);
ValueClassOption valueClassOption =
@@ -936,6 +986,15 @@ public class SequenceFile {
throw new IllegalArgumentException("file modifier options not " +
"compatible with stream");
}
+ // exactly one of serialization or class must be set.
+ if ((keySerializationOption == null) == (keyClassOption == null)) {
+ throw new IllegalArgumentException("Either keySerialization or " +
+ " keyClass must be set.");
+ }
+ if ((valueSerializationOption == null) == (valueClassOption == null)) {
+ throw new IllegalArgumentException("Either valueSerialization or " +
+ " valueClass must be set.");
+ }
FSDataOutputStream out;
boolean ownStream = fileOption != null;
@@ -955,10 +1014,31 @@ public class SequenceFile {
} else {
out = streamOption.getValue();
}
- Class<?> keyClass = keyClassOption == null ?
- Object.class : keyClassOption.getValue();
- Class<?> valueClass = valueClassOption == null ?
- Object.class : valueClassOption.getValue();
+
+ // find the key serialization by parameter or by key type
+ Serialization<Object> keySerialization;
+ if (keyClassOption != null) {
+ Class<?> keyClass = keyClassOption.getValue();
+ SerializationFactory factory = SerializationFactory.getInstance(conf);
+ keySerialization =
+ (Serialization<Object>) factory.getSerializationByType(keyClass);
+ } else {
+ keySerialization =
+ (Serialization<Object>) keySerializationOption.getValue();
+ }
+
+ // find the value serialization by parameter or by value type
+ Serialization<Object> valueSerialization;
+ if (valueClassOption != null) {
+ Class<?> valueClass = valueClassOption.getValue();
+ SerializationFactory factory = SerializationFactory.getInstance(conf);
+ valueSerialization =
+ (Serialization<Object>) factory.getSerializationByType(valueClass);
+ } else {
+ valueSerialization =
+ (Serialization<Object>) valueSerializationOption.getValue();
+ }
+
Metadata metadata = metadataOption == null ?
new Metadata() : metadataOption.getValue();
this.compress = compressionTypeOption.getValue();
@@ -971,7 +1051,8 @@ public class SequenceFile {
"GzipCodec without native-hadoop " +
"code!");
}
- init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
+ init(conf, out, ownStream, keySerialization, valueSerialization,
+ codec, metadata);
}
/** Create the named file.
@@ -979,11 +1060,15 @@ public class SequenceFile {
* {@link SequenceFile#createWriter(Configuration, Writer.Option...)}
* instead.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass) throws IOException {
this.compress = CompressionType.NONE;
- init(conf, fs.create(name), true, keyClass, valClass, null,
+ SerializationFactory factory = SerializationFactory.getInstance(conf);
+ init(conf, fs.create(name), true,
+ factory.getSerializationByType(keyClass),
+ factory.getSerializationByType(valClass), null,
new Metadata());
}
@@ -992,12 +1077,16 @@ public class SequenceFile {
* {@link SequenceFile#createWriter(Configuration, Writer.Option...)}
* instead.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass,
Progressable progress, Metadata metadata) throws IOException {
this.compress = CompressionType.NONE;
- init(conf, fs.create(name, progress), true, keyClass, valClass,
+ SerializationFactory factory = SerializationFactory.getInstance(conf);
+ init(conf, fs.create(name, progress), true,
+ factory.getSerializationByType(keyClass),
+ factory.getSerializationByType(valClass),
null, metadata);
}
@@ -1006,15 +1095,18 @@ public class SequenceFile {
* {@link SequenceFile#createWriter(Configuration, Writer.Option...)}
* instead.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass,
int bufferSize, short replication, long blockSize,
Progressable progress, Metadata metadata) throws IOException {
this.compress = CompressionType.NONE;
+ SerializationFactory factory = SerializationFactory.getInstance(conf);
init(conf,
fs.create(name, true, bufferSize, replication, blockSize, progress),
- true, keyClass, valClass, null, metadata);
+ true, factory.getSerializationByType(keyClass),
+ factory.getSerializationByType(valClass), null, metadata);
}
boolean isCompressed() { return compress != CompressionType.NONE; }
@@ -1024,8 +1116,20 @@ public class SequenceFile {
private void writeFileHeader()
throws IOException {
out.write(VERSION);
- Text.writeString(out, keyClass.getName());
- Text.writeString(out, valClass.getName());
+
+ // write out key serialization
+ Text.writeString(out, keySerialization.getName());
+ buffer.reset();
+ keySerialization.serializeSelf(buffer);
+ WritableUtils.writeVInt(out, buffer.getLength());
+ out.write(buffer.getData(), 0, buffer.getLength());
+
+ // write out value serialization
+ Text.writeString(out, valueSerialization.getName());
+ buffer.reset();
+ valueSerialization.serializeSelf(buffer);
+ WritableUtils.writeVInt(out, buffer.getLength());
+ out.write(buffer.getData(), 0, buffer.getLength());
out.writeBoolean(this.isCompressed());
out.writeBoolean(this.isBlockCompressed());
@@ -1039,40 +1143,74 @@ public class SequenceFile {
}
/** Initialize. */
- @SuppressWarnings("unchecked")
void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
- Class keyClass, Class valClass,
+ Serialization<Object> keySerialization,
+ Serialization<Object> valueSerialization,
CompressionCodec codec, Metadata metadata)
throws IOException {
this.conf = conf;
this.out = out;
this.ownOutputStream = ownStream;
- this.keyClass = keyClass;
- this.valClass = valClass;
+ this.keySerialization = keySerialization;
+ this.valueSerialization = valueSerialization;
this.codec = codec;
this.metadata = metadata;
- SerializationFactory serializationFactory = new SerializationFactory(conf);
- this.keySerializer = serializationFactory.getSerializer(keyClass);
- this.keySerializer.open(buffer);
- this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
- this.uncompressedValSerializer.open(buffer);
if (this.codec != null) {
ReflectionUtils.setConf(this.codec, this.conf);
this.compressor = CodecPool.getCompressor(this.codec);
this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
this.deflateOut =
new DataOutputStream(new BufferedOutputStream(deflateFilter));
- this.compressedValSerializer = serializationFactory.getSerializer(valClass);
- this.compressedValSerializer.open(deflateOut);
}
writeFileHeader();
}
- /** Returns the class of keys in this file. */
- public Class getKeyClass() { return keyClass; }
+ /** Returns the class of keys in this file. Only works for
+ * if a TypedSerialization is used, otherwise Object is returned.
+ * @deprecated Use {@link #getKeySerialization} instead.
+ */
+ @SuppressWarnings("unchecked")
+ @Deprecated
+ public Class getKeyClass() {
+ Class result = null;
+ if (keySerialization instanceof TypedSerialization<?>) {
+ TypedSerialization typed = (TypedSerialization) keySerialization;
+ result = typed.getSpecificType();
+ }
+ return result == null ? Object.class : result;
+ }
- /** Returns the class of values in this file. */
- public Class getValueClass() { return valClass; }
+
+ /** Returns the class of values in this file. Only works for
+ * if a TypedSerialization is used, otherwise Object is returned.
+ * @deprecated Use {@link #getValueSerialization} instead.
+ */
+ @SuppressWarnings("unchecked")
+ @Deprecated
+ public Class getValueClass() {
+ Class result = null;
+ if (valueSerialization instanceof TypedSerialization<?>) {
+ TypedSerialization typed = (TypedSerialization) valueSerialization;
+ result = typed.getSpecificType();
+ }
+ return result == null ? Object.class : result;
+ }
+
+ /**
+ * Return the serialization that is used to serialize the keys.
+ * @return the key serialization
+ */
+ public Serialization<?> getKeySerialization() {
+ return keySerialization;
+ }
+
+ /**
+ * Return the serialization that is used to serialize the values.
+ * @return the value serialization
+ */
+ public Serialization<?> getValueSerialization() {
+ return valueSerialization;
+ }
/** Returns the compression codec of data in this file. */
public CompressionCodec getCompressionCodec() { return codec; }
@@ -1091,12 +1229,6 @@ public class SequenceFile {
/** Close the file. */
public synchronized void close() throws IOException {
- keySerializer.close();
- uncompressedValSerializer.close();
- if (compressedValSerializer != null) {
- compressedValSerializer.close();
- }
-
CodecPool.returnCompressor(compressor);
compressor = null;
@@ -1119,27 +1251,20 @@ public class SequenceFile {
}
}
- /** Append a key/value pair. */
- public void append(Writable key, Writable val)
- throws IOException {
+ /** Append a key/value pair.
+ */
+ public void append(Writable key, Writable val) throws IOException {
append((Object) key, (Object) val);
}
/** Append a key/value pair. */
- @SuppressWarnings("unchecked")
public synchronized void append(Object key, Object val)
throws IOException {
- if (key.getClass() != keyClass)
- throw new IOException("wrong key class: "+key.getClass().getName()
- +" is not "+keyClass);
- if (val.getClass() != valClass)
- throw new IOException("wrong value class: "+val.getClass().getName()
- +" is not "+valClass);
buffer.reset();
// Append the 'key'
- keySerializer.serialize(key);
+ keySerialization.serialize(buffer, key);
int keyLength = buffer.getLength();
if (keyLength < 0)
throw new IOException("negative length keys not allowed: " + key);
@@ -1147,11 +1272,11 @@ public class SequenceFile {
// Append the 'value'
if (compress == CompressionType.RECORD) {
deflateFilter.resetState();
- compressedValSerializer.serialize(val);
+ valueSerialization.serialize(deflateFilter, val);
deflateOut.flush();
deflateFilter.finish();
} else {
- uncompressedValSerializer.serialize(val);
+ valueSerialization.serialize(buffer, val);
}
// Write the record out
@@ -1200,27 +1325,18 @@ public class SequenceFile {
}
/** Append a key/value pair. */
- @SuppressWarnings("unchecked")
- public synchronized void append(Object key, Object val)
- throws IOException {
- if (key.getClass() != keyClass)
- throw new IOException("wrong key class: "+key.getClass().getName()
- +" is not "+keyClass);
- if (val.getClass() != valClass)
- throw new IOException("wrong value class: "+val.getClass().getName()
- +" is not "+valClass);
-
- buffer.reset();
+ public synchronized void append(Object key, Object val) throws IOException {
// Append the 'key'
- keySerializer.serialize(key);
+ buffer.reset();
+ keySerialization.serialize(buffer, key);
int keyLength = buffer.getLength();
if (keyLength < 0)
throw new IOException("negative length keys not allowed: " + key);
// Compress 'value' and append it
deflateFilter.resetState();
- compressedValSerializer.serialize(val);
+ valueSerialization.serialize(deflateFilter, val);
deflateOut.flush();
deflateFilter.finish();
@@ -1267,10 +1383,6 @@ public class SequenceFile {
super(conf, options);
compressionBlockSize =
conf.getInt("io.seqfile.compress.blocksize", 1000000);
- keySerializer.close();
- keySerializer.open(keyBuffer);
- uncompressedValSerializer.close();
- uncompressedValSerializer.open(valBuffer);
}
/** Workhorse to check and write out compressed data/lengths */
@@ -1326,24 +1438,18 @@ public class SequenceFile {
}
/** Append a key/value pair. */
- @SuppressWarnings("unchecked")
- public synchronized void append(Object key, Object val)
- throws IOException {
- if (key.getClass() != keyClass)
- throw new IOException("wrong key class: "+key+" is not "+keyClass);
- if (val.getClass() != valClass)
- throw new IOException("wrong value class: "+val+" is not "+valClass);
+ public synchronized void append(Object key, Object val) throws IOException {
// Save key/value into respective buffers
int oldKeyLength = keyBuffer.getLength();
- keySerializer.serialize(key);
+ keySerialization.serialize(keyBuffer, key);
int keyLength = keyBuffer.getLength() - oldKeyLength;
if (keyLength < 0)
throw new IOException("negative length keys not allowed: " + key);
WritableUtils.writeVInt(keyLenBuffer, keyLength);
int oldValLength = valBuffer.getLength();
- uncompressedValSerializer.serialize(val);
+ valueSerialization.serialize(valBuffer, val);
int valLength = valBuffer.getLength() - oldValLength;
WritableUtils.writeVInt(valLenBuffer, valLength);
@@ -1393,15 +1499,9 @@ public class SequenceFile {
public static class Reader implements java.io.Closeable {
private String filename;
private FSDataInputStream in;
- private DataOutputBuffer outBuf = new DataOutputBuffer();
private byte version;
- private String keyClassName;
- private String valClassName;
- private Class keyClass;
- private Class valClass;
-
private CompressionCodec codec = null;
private Metadata metadata = null;
@@ -1411,8 +1511,6 @@ public class SequenceFile {
private long headerEnd;
private long end;
- private int keyLength;
- private int recordLength;
private boolean decompress;
private boolean blockCompressed;
@@ -1420,17 +1518,12 @@ public class SequenceFile {
private Configuration conf;
private int noBufferedRecords = 0;
- private boolean lazyDecompress = true;
- private boolean valuesDecompressed = true;
-
- private int noBufferedKeys = 0;
- private int noBufferedValues = 0;
private DataInputBuffer keyLenBuffer = null;
private CompressionInputStream keyLenInFilter = null;
private DataInputStream keyLenIn = null;
private Decompressor keyLenDecompressor = null;
- private DataInputBuffer keyBuffer = null;
+ private DataInputBuffer keyBlockBuffer = null;
private CompressionInputStream keyInFilter = null;
private DataInputStream keyIn = null;
private Decompressor keyDecompressor = null;
@@ -1444,8 +1537,13 @@ public class SequenceFile {
private DataInputStream valIn = null;
private Decompressor valDecompressor = null;
- private Deserializer keyDeserializer;
- private Deserializer valDeserializer;
+ // used for object serialization
+ private DataOutputBuffer keyBuffer;
+ private MutableValueBytes valueBytes;
+ private DataInputBuffer serialBuffer;
+
+ private Serialization<Object> keySerialization;
+ private Serialization<Object> valueSerialization;
/**
* A tag interface for all of the Reader options
@@ -1471,6 +1569,24 @@ public class SequenceFile {
}
/**
+ * Create an option to specify the required key serialization.
+ * @param value the serialization to deserialize the key with
+ * @return a new option
+ */
+ public static Option keySerialization(Serialization<?> value) {
+ return new KeySerializationOption(value);
+ }
+
+ /**
+ * Create an option to specify the required value serialization.
+ * @param value the serialization to deserialize the value with
+ * @return a new option
+ */
+ public static Option valueSerialization(Serialization<?> value) {
+ return new ValueSerializationOption(value);
+ }
+
+ /**
* Create an option to specify the starting byte to read.
* @param value the number of bytes to skip over
* @return a new option
@@ -1541,6 +1657,22 @@ public class SequenceFile {
}
}
+ private static class KeySerializationOption
+ extends Options.SerializationOption
+ implements Option {
+ private KeySerializationOption(Serialization<?> value) {
+ super(value);
+ }
+ }
+
+ private static class ValueSerializationOption
+ extends Options.SerializationOption
+ implements Option {
+ private ValueSerializationOption(Serialization<?> value) {
+ super(value);
+ }
+ }
+
public Reader(Configuration conf, Option... opts) throws IOException {
// Look up the options, these are null if not set
FileOption fileOpt = Options.getOption(FileOption.class, opts);
@@ -1551,6 +1683,11 @@ public class SequenceFile {
BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
OnlyHeaderOption headerOnly =
Options.getOption(OnlyHeaderOption.class, opts);
+ KeySerializationOption keyOpt =
+ Options.getOption(KeySerializationOption.class, opts);
+ ValueSerializationOption valueOpt =
+ Options.getOption(ValueSerializationOption.class, opts);
+
// check for consistency
if ((fileOpt == null) == (streamOpt == null)) {
throw new
@@ -1560,6 +1697,7 @@ public class SequenceFile {
throw new IllegalArgumentException("buffer size can only be set when" +
" a file is specified.");
}
+
// figure out the real values
Path filename = null;
FSDataInputStream file;
@@ -1577,8 +1715,12 @@ public class SequenceFile {
file = streamOpt.getValue();
}
long start = startOpt == null ? 0 : startOpt.getValue();
+
// really set up
- initialize(filename, file, start, len, conf, headerOnly != null);
+ initialize(filename, file, start, len, conf,
+ (keyOpt == null ? null : keyOpt.getValue()),
+ (valueOpt == null ? null : valueOpt.getValue()),
+ headerOnly != null);
}
/**
@@ -1614,6 +1756,8 @@ public class SequenceFile {
/** Common work of the constructors. */
private void initialize(Path filename, FSDataInputStream in,
long start, long length, Configuration conf,
+ Serialization<?> keySerialization,
+ Serialization<?> valueSerialization,
boolean tempReader) throws IOException {
if (in == null) {
throw new IllegalArgumentException("in == null");
@@ -1625,12 +1769,11 @@ public class SequenceFile {
try {
seek(start);
this.end = this.in.getPos() + length;
- System.out.println("Setting end to " + end);
// if it wrapped around, use the max
if (end < length) {
end = Long.MAX_VALUE;
}
- init(tempReader);
+ init(tempReader, keySerialization, valueSerialization);
succeeded = true;
} finally {
if (!succeeded) {
@@ -1654,7 +1797,34 @@ public class SequenceFile {
int bufferSize, long length) throws IOException {
return fs.open(file, bufferSize);
}
-
+
+ @SuppressWarnings("unchecked")
+ private
+ Serialization<Object> readSerialization(SerializationFactory factory,
+ Serialization<?> override
+ ) throws IOException {
+ String serializationName = Text.readString(in);
+ Serialization<?> result;
+ if (override == null) {
+ result = factory.getSerialization(serializationName);
+ } else {
+ if (!serializationName.equals(override.getName())) {
+ throw new IllegalArgumentException("using serialization " +
+ override.getName() +
+ " instead of " +
+ serializationName);
+ }
+ result = override;
+ }
+ int keySerialLength = WritableUtils.readVInt(in);
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] bytes = new byte[keySerialLength];
+ in.readFully(bytes);
+ buffer.reset(bytes, keySerialLength);
+ result.deserializeSelf(buffer, conf);
+ return (Serialization<Object>) result;
+ }
+
/**
* Initialize the {@link Reader}
* @param tmpReader <code>true</code> if we are constructing a temporary
@@ -1663,7 +1833,10 @@ public class SequenceFile {
* <code>false</code> otherwise.
* @throws IOException
*/
- private void init(boolean tempReader) throws IOException {
+ @SuppressWarnings({ "unchecked", "deprecation" })
+ private void init(boolean tempReader,
+ Serialization keySerialization,
+ Serialization valueSerialization) throws IOException {
byte[] versionBlock = new byte[VERSION.length];
in.readFully(versionBlock);
@@ -1677,17 +1850,39 @@ public class SequenceFile {
if (version > VERSION[3])
throw new VersionMismatchException(VERSION[3], version);
- if (version < BLOCK_COMPRESS_VERSION) {
- UTF8 className = new UTF8();
+ SerializationFactory factory = SerializationFactory.getInstance(conf);
+ if (version < SERIALIZATION_VERSION) {
+ String keyClassName;
+ String valueClassName;
+ if (version < BLOCK_COMPRESS_VERSION) {
+ UTF8 className = new UTF8();
- className.readFields(in);
- keyClassName = className.toString(); // key class name
+ className.readFields(in);
+ keyClassName = className.toString(); // key class name
- className.readFields(in);
- valClassName = className.toString(); // val class name
+ className.readFields(in);
+ valueClassName = className.toString(); // val class name
+ } else {
+ keyClassName = Text.readString(in);
+ valueClassName = Text.readString(in);
+ }
+ try {
+ this.keySerialization = (Serialization<Object>)
+ factory.getSerializationByType(conf.getClassByName(keyClassName));
+ } catch (ClassNotFoundException cnf) {
+ throw new RuntimeException("key class " + keyClassName +
+ " not found");
+ }
+ try {
+ this.valueSerialization = (Serialization<Object>)
+ factory.getSerializationByType(conf.getClassByName(valueClassName));
+ } catch (ClassNotFoundException cnf) {
+ throw new RuntimeException("value class " + valueClassName +
+ " not found");
+ }
} else {
- keyClassName = Text.readString(in);
- valClassName = Text.readString(in);
+ this.keySerialization = readSerialization(factory, keySerialization);
+ this.valueSerialization = readSerialization(factory,valueSerialization);
}
if (version > 2) { // if version > 2
@@ -1733,6 +1928,8 @@ public class SequenceFile {
// Initialize... *not* if this we are constructing a temporary Reader
if (!tempReader) {
+ keyBuffer = new DataOutputBuffer();
+ serialBuffer = new DataInputBuffer();
valBuffer = new DataInputBuffer();
if (decompress) {
valDecompressor = CodecPool.getDecompressor(codec);
@@ -1744,7 +1941,7 @@ public class SequenceFile {
if (blockCompressed) {
keyLenBuffer = new DataInputBuffer();
- keyBuffer = new DataInputBuffer();
+ keyBlockBuffer = new DataInputBuffer();
valLenBuffer = new DataInputBuffer();
keyLenDecompressor = CodecPool.getDecompressor(codec);
@@ -1753,7 +1950,8 @@ public class SequenceFile {
keyLenIn = new DataInputStream(keyLenInFilter);
keyDecompressor = CodecPool.getDecompressor(codec);
- keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
+ keyInFilter = codec.createInputStream(keyBlockBuffer,
+ keyDecompressor);
keyIn = new DataInputStream(keyInFilter);
valLenDecompressor = CodecPool.getDecompressor(codec);
@@ -1761,27 +1959,10 @@ public class SequenceFile {
valLenDecompressor);
valLenIn = new DataInputStream(valLenInFilter);
}
-
- SerializationFactory serializationFactory =
- new SerializationFactory(conf);
- this.keyDeserializer =
- getDeserializer(serializationFactory, getKeyClass());
- if (!blockCompressed) {
- this.keyDeserializer.open(valBuffer);
- } else {
- this.keyDeserializer.open(keyIn);
- }
- this.valDeserializer =
- getDeserializer(serializationFactory, getValueClass());
- this.valDeserializer.open(valIn);
+ valueBytes = (MutableValueBytes) createValueBytes();
}
}
- @SuppressWarnings("unchecked")
- private Deserializer getDeserializer(SerializationFactory sf, Class c) {
- return sf.getDeserializer(c);
- }
-
/** Close the file. */
public synchronized void close() throws IOException {
// Return the decompressors to the pool
@@ -1792,49 +1973,80 @@ public class SequenceFile {
keyLenDecompressor = keyDecompressor = null;
valLenDecompressor = valDecompressor = null;
- if (keyDeserializer != null) {
- keyDeserializer.close();
- }
- if (valDeserializer != null) {
- valDeserializer.close();
- }
-
// Close the input-stream
in.close();
}
- /** Returns the name of the key class. */
+ /**
+ * Return the name of the key class. It only works for
+ * TypedSerializations and otherwise returns Object.
+ * @return the key class name
+ * @deprecated Use {@link #getKeySerialization()} instead.
+ */
+ @Deprecated
public String getKeyClassName() {
- return keyClassName;
+ return getKeyClass().getName();
}
- /** Returns the class of keys in this file. */
+ /**
+ * Get the class of the keys in this file. It only works for
+ * TypedSerializations and otherwise returns Object.
+ * @return the class of the keys
+ * @deprecated Use {@link #getKeySerialization()} instead.
+ */
+ @Deprecated
+ @SuppressWarnings("unchecked")
public synchronized Class<?> getKeyClass() {
- if (null == keyClass) {
- try {
- keyClass = WritableName.getClass(getKeyClassName(), conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ Class result = null;
+ if (keySerialization instanceof TypedSerialization) {
+ TypedSerialization typed = (TypedSerialization) keySerialization;
+ result = typed.getSpecificType();
}
- return keyClass;
+ return result == null ? Object.class : result;
}
- /** Returns the name of the value class. */
+ /**
+ * Return the name of the value class. It only works for
+ * TypedSerializations and otherwise returns Object.
+ * @return the value class name
+ * @deprecated Use {@link #getValueSerialization()} instead.
+ */
+ @Deprecated
public String getValueClassName() {
- return valClassName;
+ return getValueClass().getName();
}
- /** Returns the class of values in this file. */
+ /**
+ * Get the class of the values in this file. It only works for
+ * TypedSerializations and otherwise returns Object.
+ * @return the class of the values
+ * @deprecated Use {@link #getValueSerialization()} instead.
+ */
+ @Deprecated
+ @SuppressWarnings("unchecked")
public synchronized Class<?> getValueClass() {
- if (null == valClass) {
- try {
- valClass = WritableName.getClass(getValueClassName(), conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ Class result = null;
+ if (valueSerialization instanceof TypedSerialization) {
+ TypedSerialization typed = (TypedSerialization) valueSerialization;
+ result = typed.getSpecificType();
}
- return valClass;
+ return result == null ? Object.class : result;
+ }
+
+ /**
+ * Get the serialization for the key.
+ * @return the key serialization
+ */
+ public Serialization<?> getKeySerialization() {
+ return keySerialization;
+ }
+
+ /**
+ * Get the serialization for the value.
+ * @return the value serialization
+ */
+ public Serialization<?> getValueSerialization() {
+ return valueSerialization;
}
/** Returns true if values are compressed. */
@@ -1888,16 +2100,9 @@ public class SequenceFile {
/** Read the next 'compressed' block */
private synchronized void readBlock() throws IOException {
- // Check if we need to throw away a whole block of
- // 'values' due to 'lazy decompression'
- if (lazyDecompress && !valuesDecompressed) {
- in.seek(WritableUtils.readVInt(in)+in.getPos());
- in.seek(WritableUtils.readVInt(in)+in.getPos());
- }
// Reset internal states
- noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
- valuesDecompressed = false;
+ noBufferedRecords = 0;
//Process sync
if (sync != null) {
@@ -1913,55 +2118,11 @@ public class SequenceFile {
// Read key lengths and keys
readBuffer(keyLenBuffer, keyLenInFilter);
- readBuffer(keyBuffer, keyInFilter);
- noBufferedKeys = noBufferedRecords;
+ readBuffer(keyBlockBuffer, keyInFilter);
// Read value lengths and values
- if (!lazyDecompress) {
- readBuffer(valLenBuffer, valLenInFilter);
- readBuffer(valBuffer, valInFilter);
- noBufferedValues = noBufferedRecords;
- valuesDecompressed = true;
- }
- }
-
- /**
- * Position valLenIn/valIn to the 'value'
- * corresponding to the 'current' key
- */
- private synchronized void seekToCurrentValue() throws IOException {
- if (!blockCompressed) {
- if (decompress) {
- valInFilter.resetState();
- }
- valBuffer.reset();
- } else {
- // Check if this is the first value in the 'block' to be read
- if (lazyDecompress && !valuesDecompressed) {
- // Read the value lengths and values
- readBuffer(valLenBuffer, valLenInFilter);
- readBuffer(valBuffer, valInFilter);
- noBufferedValues = noBufferedRecords;
- valuesDecompressed = true;
- }
-
- // Calculate the no. of bytes to skip
- // Note: 'current' key has already been read!
- int skipValBytes = 0;
- int currentKey = noBufferedKeys + 1;
- for (int i=noBufferedValues; i > currentKey; --i) {
- skipValBytes += WritableUtils.readVInt(valLenIn);
- --noBufferedValues;
- }
-
- // Skip to the 'val' corresponding to 'current' key
- if (skipValBytes > 0) {
- if (valIn.skipBytes(skipValBytes) != skipValBytes) {
- throw new IOException("Failed to seek to " + currentKey +
- "(th) value!");
- }
- }
- }
+ readBuffer(valLenBuffer, valLenInFilter);
+ readBuffer(valBuffer, valInFilter);
}
/**
@@ -1969,148 +2130,27 @@ public class SequenceFile {
* @param val : The 'value' to be read.
* @throws IOException
*/
- public synchronized void getCurrentValue(Writable val)
- throws IOException {
- if (val instanceof Configurable) {
- ((Configurable) val).setConf(this.conf);
- }
-
- // Position stream to 'current' value
- seekToCurrentValue();
-
- if (!blockCompressed) {
- val.readFields(valIn);
-
- if (valIn.read() > 0) {
- LOG.info("available bytes: " + valIn.available());
- throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
- + " bytes, should read " +
- (valBuffer.getLength()-keyLength));
- }
- } else {
- // Get the value
- int valLength = WritableUtils.readVInt(valLenIn);
- val.readFields(valIn);
-
- // Read another compressed 'value'
- --noBufferedValues;
-
- // Sanity check
- if ((valLength < 0) && LOG.isDebugEnabled()) {
- LOG.debug(val + " is a zero-length value");
- }
- }
-
- }
-
- /**
- * Get the 'value' corresponding to the last read 'key'.
- * @param val : The 'value' to be read.
- * @throws IOException
- */
- public synchronized Object getCurrentValue(Object val)
- throws IOException {
- if (val instanceof Configurable) {
- ((Configurable) val).setConf(this.conf);
- }
-
- // Position stream to 'current' value
- seekToCurrentValue();
-
- if (!blockCompressed) {
- val = deserializeValue(val);
-
- if (valIn.read() > 0) {
- LOG.info("available bytes: " + valIn.available());
- throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
- + " bytes, should read " +
- (valBuffer.getLength()-keyLength));
- }
- } else {
- // Get the value
- int valLength = WritableUtils.readVInt(valLenIn);
- val = deserializeValue(val);
-
- // Read another compressed 'value'
- --noBufferedValues;
-
- // Sanity check
- if ((valLength < 0) && LOG.isDebugEnabled()) {
- LOG.debug(val + " is a zero-length value");
- }
- }
- return val;
-
- }
-
- @SuppressWarnings("unchecked")
- private Object deserializeValue(Object val) throws IOException {
- return valDeserializer.deserialize(val);
- }
-
- /** Read the next key in the file into <code>key</code>, skipping its
- * value. True if another entry exists, and false at end of file. */
- public synchronized boolean next(Writable key) throws IOException {
- if (key.getClass() != getKeyClass())
- throw new IOException("wrong key class: "+key.getClass().getName()
- +" is not "+keyClass);
-
- if (!blockCompressed) {
- outBuf.reset();
-
- keyLength = next(outBuf);
- if (keyLength < 0)
- return false;
-
- valBuffer.reset(outBuf.getData(), outBuf.getLength());
-
- key.readFields(valBuffer);
- valBuffer.mark(0);
- if (valBuffer.getPosition() != keyLength)
- throw new IOException(key + " read " + valBuffer.getPosition()
- + " bytes, should read " + keyLength);
- } else {
- //Reset syncSeen
- syncSeen = false;
-
- if (noBufferedKeys == 0) {
- try {
- readBlock();
- } catch (EOFException eof) {
- return false;
- }
- }
-
- int keyLength = WritableUtils.readVInt(keyLenIn);
-
- // Sanity check
- if (keyLength < 0) {
- return false;
- }
-
- //Read another compressed 'key'
- key.readFields(keyIn);
- --noBufferedKeys;
- }
-
- return true;
+ public synchronized Object getCurrentValue(Object val) throws IOException {
+ return valueSerialization.deserialize(readUncompressedBytes(valueBytes),
+ val, conf);
}
/** Read the next key/value pair in the file into <code>key</code> and
* <code>val</code>. Returns true if such a pair exists and false when at
- * end of file */
- public synchronized boolean next(Writable key, Writable val)
- throws IOException {
- if (val.getClass() != getValueClass())
- throw new IOException("wrong value class: "+val+" is not "+valClass);
+ * end of file
+ * @deprecated Use {@link #next(Object)} and
+ * {@link #getCurrentValue(Object)} to iterate through keys and values.
+ */
+ @Deprecated
+ public synchronized boolean next(Writable key,
+ Writable val) throws IOException {
- boolean more = next(key);
-
- if (more) {
+ if (nextKey(key) == null) {
+ return false;
+ } else {
getCurrentValue(val);
+ return true;
}
-
- return more;
}
/**
@@ -2141,32 +2181,6 @@ public class SequenceFile {
return length;
}
- /** Read the next key/value pair in the file into <code>buffer</code>.
- * Returns the length of the key read, or -1 if at end of file. The length
- * of the value may be computed by calling buffer.getLength() before and
- * after calls to this method. */
- /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
- @Deprecated
- synchronized int next(DataOutputBuffer buffer) throws IOException {
- // Unsupported for block-compressed sequence files
- if (blockCompressed) {
- throw new IOException("Unsupported call for block-compressed" +
- " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
- }
- try {
- int length = readRecordLength();
- if (length == -1) {
- return -1;
- }
- int keyLength = in.readInt();
- buffer.write(in, length);
- return keyLength;
- } catch (ChecksumException e) { // checksum failure
- handleChecksumException(e);
- return next(buffer);
- }
- }
-
public ValueBytes createValueBytes() {
ValueBytes val = null;
if (!decompress || blockCompressed) {
@@ -2178,14 +2192,15 @@ public class SequenceFile {
}
/**
- * Read 'raw' records.
+ * Read 'raw' records. Doesn't reset the key buffer. The new key appends
+ * on to the current contents.
* @param key - The buffer into which the key is read
- * @param val - The 'raw' value
+ * @param value - The 'raw' value
* @return Returns the total record length or -1 for end of file
* @throws IOException
*/
- public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val)
- throws IOException {
+ public synchronized int nextRaw(DataOutputBuffer key,
+ ValueBytes value) throws IOException {
if (!blockCompressed) {
int length = readRecordLength();
if (length == -1) {
@@ -2194,13 +2209,7 @@ public class SequenceFile {
int keyLength = in.readInt();
int valLength = length - keyLength;
key.write(in, keyLength);
- if (decompress) {
- CompressedBytes value = (CompressedBytes)val;
- value.reset(in, valLength);
- } else {
- UncompressedBytes value = (UncompressedBytes)val;
- value.reset(in, valLength);
- }
+ ((MutableValueBytes) value).reset(in, valLength);
return length;
} else {
@@ -2208,29 +2217,23 @@ public class SequenceFile {
syncSeen = false;
// Read 'key'
- if (noBufferedKeys == 0) {
+ if (noBufferedRecords == 0) {
if (in.getPos() >= end)
return -1;
- try {
- readBlock();
- } catch (EOFException eof) {
- return -1;
- }
+ readBlock();
}
int keyLength = WritableUtils.readVInt(keyLenIn);
if (keyLength < 0) {
throw new IOException("zero length key found!");
}
key.write(keyIn, keyLength);
- --noBufferedKeys;
+ --noBufferedRecords;
// Read raw 'value'
- seekToCurrentValue();
int valLength = WritableUtils.readVInt(valLenIn);
- UncompressedBytes rawValue = (UncompressedBytes)val;
+ UncompressedBytes rawValue = (UncompressedBytes)value;
rawValue.reset(valIn, valLength);
- --noBufferedValues;
return (keyLength+valLength);
}
@@ -2243,95 +2246,47 @@ public class SequenceFile {
* @return Returns the key length or -1 for end of file
* @throws IOException
*/
- public synchronized int nextRawKey(DataOutputBuffer key)
- throws IOException {
- if (!blockCompressed) {
- recordLength = readRecordLength();
- if (recordLength == -1) {
- return -1;
- }
- keyLength = in.readInt();
- key.write(in, keyLength);
- return keyLength;
- } else {
- //Reset syncSeen
- syncSeen = false;
-
- // Read 'key'
- if (noBufferedKeys == 0) {
- if (in.getPos() >= end)
- return -1;
-
- try {
- readBlock();
- } catch (EOFException eof) {
- return -1;
- }
- }
- int keyLength = WritableUtils.readVInt(keyLenIn);
- if (keyLength < 0) {
- throw new IOException("zero length key found!");
- }
- key.write(keyIn, keyLength);
- --noBufferedKeys;
-
- return keyLength;
- }
-
+ public synchronized int nextRawKey(DataOutputBuffer key) throws IOException{
+ key.reset();
+ return nextRaw(key, valueBytes);
}
- /** Read the next key in the file, skipping its
- * value. Return null at end of file. */
- public synchronized Object next(Object key) throws IOException {
- if (key != null && key.getClass() != getKeyClass()) {
- throw new IOException("wrong key class: "+key.getClass().getName()
- +" is not "+keyClass);
- }
-
- if (!blockCompressed) {
- outBuf.reset();
-
- keyLength = next(outBuf);
- if (keyLength < 0)
- return null;
-
- valBuffer.reset(outBuf.getData(), outBuf.getLength());
-
- key = deserializeKey(key);
- valBuffer.mark(0);
- if (valBuffer.getPosition() != keyLength)
- throw new IOException(key + " read " + valBuffer.getPosition()
- + " bytes, should read " + keyLength);
- } else {
- //Reset syncSeen
- syncSeen = false;
-
- if (noBufferedKeys == 0) {
- try {
- readBlock();
- } catch (EOFException eof) {
- return null;
- }
- }
-
- int keyLength = WritableUtils.readVInt(keyLenIn);
-
- // Sanity check
- if (keyLength < 0) {
- return null;
- }
-
- //Read another compressed 'key'
- key = deserializeKey(key);
- --noBufferedKeys;
- }
+ /**
+ * Read the next key in the file.
+ * The value is available via {@link #getCurrentValue}.
+ * @param key if not null, may be used to hold the next key
+ * @return true if a key was read, false if eof
+ * @throws IOException
+ * @deprecated Use {@link #nextKey} instead.
+ */
+ @Deprecated
+ public boolean next(Writable key) throws IOException {
+ return nextKey(key) != null;
+ }
- return key;
+ /**
+ * Read the next key from the file.
+ * @param key if not null, may be used to hold the next key
+ * @return the key that was read
+ * @throws IOException
+ * @deprecated Use {@link #nextKey} instead.
+ */
+ @Deprecated
+ public Object next(Object key) throws IOException {
+ return nextKey(key);
}
- @SuppressWarnings("unchecked")
- private Object deserializeKey(Object key) throws IOException {
- return keyDeserializer.deserialize(key);
+ /** Read the next key in the file.
+ * The value is available via {@link #getCurrentValue}.
+ */
+ public synchronized Object nextKey(Object key) throws IOException {
+ keyBuffer.reset();
+ int recordLen = nextRaw(keyBuffer, valueBytes);
+ if (recordLen < 0) {
+ return null;
+ }
+ serialBuffer.reset(keyBuffer.getData(), keyBuffer.getLength());
+ return keySerialization.deserialize(serialBuffer, key, conf);
}
/**
@@ -2340,31 +2295,9 @@ public class SequenceFile {
* @return Returns the value length
* @throws IOException
*/
- public synchronized int nextRawValue(ValueBytes val)
- throws IOException {
-
- // Position stream to current value
- seekToCurrentValue();
-
- if (!blockCompressed) {
- int valLength = recordLength - keyLength;
- if (decompress) {
- CompressedBytes value = (CompressedBytes)val;
- value.reset(in, valLength);
- } else {
- UncompressedBytes value = (UncompressedBytes)val;
- value.reset(in, valLength);
- }
-
- return valLength;
- } else {
- int valLength = WritableUtils.readVInt(valLenIn);
- UncompressedBytes rawValue = (UncompressedBytes)val;
- rawValue.reset(valIn, valLength);
- --noBufferedValues;
- return valLength;
- }
-
+ public synchronized int nextRawValue(ValueBytes val) throws IOException {
+ ((MutableValueBytes) val).set(valueBytes);
+ return val.getSize();
}
private void handleChecksumException(ChecksumException e)
@@ -2391,8 +2324,7 @@ public class SequenceFile {
public synchronized void seek(long position) throws IOException {
in.seek(position);
if (blockCompressed) { // trigger block read
- noBufferedKeys = 0;
- valuesDecompressed = true;
+ noBufferedRecords = 0;
}
}
@@ -2447,58 +2379,172 @@ public class SequenceFile {
}
- /** Sorts key/value pairs in a sequence-format file.
+ /** Sorts key/value pairs in a sequence-format file. This class is no longer
+ * used by Hadoop and will be removed in a later release.
*
- * <p>For best performance, applications should make sure that the {@link
- * Writable#readFields(DataInput)} implementation of their keys is
- * very efficient. In particular, it should avoid allocating memory.
+ * <p>For best performance, applications should make sure that the
+ * {@link RawComparator} that is used is efficient.
*/
+ @Deprecated
public static class Sorter {
- private RawComparator comparator;
+ private final RawComparator comparator;
+ private Writer.Option[] options;
+ private final Configuration conf;
+ private final FileContext fc;
+ private int memory; // bytes
+ private int factor; // merged per pass
+ private final Serialization<?> keySerialization;
+ private final Serialization<?> valueSerialization;
private MergeSort mergeSort; //the implementation of merge sort
private Path[] inFiles; // when merging or sorting
private Path outFile;
+
+ private CompressionType compressType;
+ private CompressionCodec compressCodec;
+
+ /**
+ * Look at the first input file's header to figure out the compression for
+ * the output.
+ * @throws IOException
+ */
+ private void setCompressionType() throws IOException {
+ if (inFiles == null || inFiles.length == 0) {
+ return;
+ }
+ Reader reader = new Reader(conf, Reader.file(inFiles[0]),
+ new Reader.OnlyHeaderOption());
+ compressType = reader.getCompressionType();
+ compressCodec = reader.getCompressionCodec();
+ reader.close();
+ }
- private int memory; // bytes
- private int factor; // merged per pass
+ public static interface Option extends Writer.Option { }
+
+ public static Option comparator(RawComparator value) {
+ return new ComparatorOption(value);
+ }
- private FileSystem fs = null;
+ private static class ComparatorOption extends Options.ComparatorOption
+ implements Option {
+ private ComparatorOption(RawComparator value) {
+ super(value);
+ }
+ }
- private Class keyClass;
- private Class valClass;
+ /**
+ * Create a Sorter.
+ * @param conf the configuration for the Sorter
+ * @param options the options controlling the sort, in particular the
+ * comparator that will sort the data and the options to write the
+ * output SequenceFiles. Since the bytes are not deserialized during the
+ * sort, the serialization for keys and values of the inputs must match
+ * the options for writing the SequenceFiles.
+ */
+ public Sorter(Configuration conf, Writer.Option... options ) {
+ this.options = options;
+ this.conf = conf;
+ this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
+ this.factor = conf.getInt("io.sort.factor", 100);
+ try {
+ fc = FileContext.getFileContext(conf);
+ } catch (UnsupportedFileSystemException ex) {
+ throw new IllegalArgumentException("can't load default filesystem", ex);
+ }
+ ComparatorOption compareOpt = Options.getOption(ComparatorOption.class,
+ options);
+ keySerialization = getSerialization(Writer.KeySerialization.class,
+ Writer.KeyClassOption.class,
+ options);
+ valueSerialization = getSerialization(Writer.ValueSerialization.class,
+ Writer.ValueClassOption.class,
+ options);
+ if (compareOpt == null) {
+ comparator = keySerialization.getRawComparator();
+ } else {
+ comparator = compareOpt.getValue();
+ }
+ }
- private Configuration conf;
- private Metadata metadata;
-
- private Progressable progressable = null;
+ private
+ Serialization<?> getSerialization(Class<? extends Writer.Option> serialOpt,
+ Class<? extends Writer.Option> classOpt,
+ Writer.Option[] options) {
+ Options.SerializationOption serialOption = (Options.SerializationOption)
+ Options.getOption(serialOpt, options);
+ if (serialOption != null) {
+ return serialOption.getValue();
+ } else {
+ Options.ClassOption classOption = (Options.ClassOption)
+ Options.getOption(classOpt, options);
+ if (classOption == null) {
+ throw new IllegalArgumentException("Must specify either a " +
+ "serializer, or "
+ + "a class");
+ }
+ Class<?> cls = classOption.getValue();
+ return SerializationFactory.getInstance(conf).
+ getSerializationByType(cls);
+ }
+ }
- /** Sort and merge files containing the named classes. */
+ /**
+ * Check to ensure the serialization of the input files matches the
+ * serialization we are using for the output. If they are not, it would
+ * corrupt the outputs since we copy the keys and values as raw bytes.
+ * @param reader the reader for the input file
+ * @param filename the filename of the file
+ * @throws IllegalArgumentException if the serialization is wrong
+ */
+ private void checkSerialization(Reader reader,
+ Path filename) {
+ if (!reader.getKeySerialization().equals(keySerialization)) {
+ throw new IllegalArgumentException("key serialization of " +
+ filename +
+ " does not match output" +
+ " parameters");
+ }
+ if (!reader.getValueSerialization().equals(valueSerialization)) {
+ throw new IllegalArgumentException("value serialization of " +
+ filename +
+ " does not match output" +
+ " parameters");
+ }
+ }
+
+ /** Sort and merge files containing the named classes.
+ * @deprecated Use Sorter(Configuration, Option...) instead.
+ */
+ @SuppressWarnings("unchecked")
+ @Deprecated
public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
Class valClass, Configuration conf) {
- this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
+ this(conf, Writer.keyClass(keyClass), Writer.valueClass(valClass));
}
- /** Sort and merge using an arbitrary {@link RawComparator}. */
+ /** Sort and merge using an arbitrary {@link RawComparator}.
+ * @deprecated Use Sorter(Configuration, Option...) instead.
+ */
+ @SuppressWarnings("unchecked")
+ @Deprecated
public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
Class valClass, Configuration conf) {
- this(fs, comparator, keyClass, valClass, conf, new Metadata());
+ this(conf, comparator(comparator), Writer.keyClass(keyClass),
+ Writer.valueClass(valClass));
}
- /** Sort and merge using an arbitrary {@link RawComparator}. */
+ /** Sort and merge using an arbitrary {@link RawComparator}.
+ * @deprecated Use Sorter(Configuration, Option...) instead.
+ */
+ @SuppressWarnings("unchecked")
+ @Deprecated
public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
Class valClass, Configuration conf, Metadata metadata) {
- this.fs = fs;
- this.comparator = comparator;
- this.keyClass = keyClass;
- this.valClass = valClass;
- this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
- this.factor = conf.getInt("io.sort.factor", 100);
- this.conf = conf;
- this.metadata = metadata;
+ this(conf, comparator(comparator), Writer.keyClass(keyClass),
+ Writer.valueClass(valClass), Writer.metadata(metadata));
}
/** Set the number of streams to merge at once.*/
@@ -2513,9 +2559,13 @@ public class SequenceFile {
/** Get the total amount of buffer memory, in bytes.*/
public int getMemory() { return memory; }
- /** Set the progressable object in order to report progress. */
+ /** Set the progressable object in order to report progress.
+ * @deprecated the progressable should be set when the Sorter is created.
+ */
+ @Deprecated
public void setProgressable(Progressable progressable) {
- this.progressable = progressable;
+ options = Options.prependOptions(options,
+ Writer.progressable(progressable));
}
/**
@@ -2526,12 +2576,12 @@ public class SequenceFile {
*/
public void sort(Path[] inFiles, Path outFile,
boolean deleteInput) throws IOException {
- if (fs.exists(outFile)) {
+ if (fc.util().exists(outFile)) {
throw new IOException("already exists: " + outFile);
}
-
this.inFiles = inFiles;
this.outFile = outFile;
+ setCompressionType();
int segments = sortPass(deleteInput);
if (segments > 1) {
@@ -2549,10 +2599,12 @@ public class SequenceFile {
public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir,
boolean deleteInput) throws IOException {
Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
- if (fs.exists(outFile)) {
+ if (fc.util().exists(outFile)) {
throw new IOException("already exists: " + outFile);
}
this.inFiles = inFiles;
+ setCompressionType();
+
//outFile will basically be used as prefix for temp files in the cases
//where sort outputs multiple sorted segments. For the single segment
//case, the outputFile itself will contain the sorted data for that
@@ -2578,11 +2630,8 @@ public class SequenceFile {
}
private int sortPass(boolean deleteInput) throws IOException {
- if(LOG.isDebugEnabled()) {
- LOG.debug("running sort pass");
- }
+ LOG.debug("running sort pass");
SortPass sortPass = new SortPass(); // make the SortPass
- sortPass.setProgressable(progressable);
mergeSort = new MergeSort(sortPass.new SeqFileComparator());
try {
return sortPass.run(deleteInput); // run it
@@ -2604,30 +2653,22 @@ public class SequenceFile {
private int[] keyLengths = new int[keyOffsets.length];
private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
- private ArrayList segmentLengths = new ArrayList();
-
private Reader in = null;
private FSDataOutputStream out = null;
private FSDataOutputStream indexOut = null;
private Path outName;
- private Progressable progressable = null;
-
public int run(boolean deleteInput) throws IOException {
int segments = 0;
int currentFile = 0;
boolean atEof = (currentFile >= inFiles.length);
- CompressionType compressionType;
- CompressionCodec codec = null;
- segmentLengths.clear();
if (atEof) {
return 0;
}
// Initialize
- in = new Reader(fs, inFiles[currentFile], conf);
- compressionType = in.getCompressionType();
- codec = in.getCompressionCodec();
+ in = new Reader(conf, Reader.file(inFiles[currentFile]));
+ checkSerialization(in, inFiles[currentFile]);
for (int i=0; i < rawValues.length; ++i) {
rawValues[i] = null;
@@ -2642,21 +2683,24 @@ public class SequenceFile {
// Read a record into buffer
// Note: Attempt to re-use 'rawValue' as far as possible
- int keyOffset = rawKeys.getLength();
- ValueBytes rawValue =
- (count == keyOffsets.length || rawValues[count] == null) ?
- in.createValueBytes() :
- rawValues[count];
+ int keyOffset = rawKeys.getLength();
+ ValueBytes rawValue;
+ if (count == keyOffsets.length || rawValues[count] == null) {
+ rawValue = in.createValueBytes();
+ } else {
+ rawValue = rawValues[count];
+ }
int recordLength = in.nextRaw(rawKeys, rawValue);
if (recordLength == -1) {
in.close();
if (deleteInput) {
- fs.delete(inFiles[currentFile], true);
+ fc.delete(inFiles[currentFile], true);
}
currentFile += 1;
atEof = currentFile >= inFiles.length;
if (!atEof) {
- in = new Reader(fs, inFiles[currentFile], conf);
+ in = new Reader(conf, Reader.file(inFiles[currentFile]));
+ checkSerialization(in, inFiles[currentFile]);
} else {
in = null;
}
@@ -2678,17 +2722,10 @@ public class SequenceFile {
}
// buffer is full -- sort & flush it
- if(LOG.isDebugEnabled()) {
- LOG.debug("flushing segment " + segments);
- }
+ LOG.debug("flushing segment " + segments);
rawBuffer = rawKeys.getData();
sort(count);
- // indicate we're making progress
- if (progressable != null) {
- progressable.progress();
- }
- flush(count, bytesProcessed, compressionType, codec,
- segments==0 && atEof);
+ flush(count, bytesProcessed, segments==0 && atEof);
segments++;
}
return segments;
@@ -2731,22 +2768,23 @@ public class SequenceFile {
}
private void flush(int count, int bytesProcessed,
- CompressionType compressionType,
- CompressionCodec codec,
boolean done) throws IOException {
if (out == null) {
outName = done ? outFile : outFile.suffix(".0");
- out = fs.create(outName);
+ out = fc.create(outName, EnumSet.of(CreateFlag.CREATE));
if (!done) {
- indexOut = fs.create(outName.suffix(".index"));
+ indexOut = fc.create(outName.suffix(".index"),
+ EnumSet.of(CreateFlag.CREATE));
}
}
long segmentStart = out.getPos();
- Writer writer = createWriter(conf, Writer.stream(out),
- Writer.keyClass(keyClass), Writer.valueClass(valClass),
- Writer.compression(compressionType, codec),
- Writer.metadata(done ? metadata : new Metadata()));
+ Writer writer =
+ createWriter(conf,
+ Options.prependOptions(options,
+ Writer.stream(out),
+ Writer.compression(compressType,
+ compressCodec)));
if (!done) {
writer.sync = null; // disable sync on temp files
@@ -2778,12 +2816,6 @@ public class SequenceFile {
}
}
- /** set the progressable object in order to report progress */
- public void setProgressable(Progressable progressable)
- {
- this.progressable = progressable;
- }
-
} // SequenceFile.Sorter.SortPass
/** The interface to iterate over raw keys/values of SequenceFiles. */
@@ -2824,7 +2856,7 @@ public class SequenceFile {
Path tmpDir)
throws IOException {
// pass in object to report progress, if present
- MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
+ MergeQueue mQueue = new MergeQueue(segments, tmpDir);
return mQueue.merge();
}
@@ -2863,13 +2895,13 @@ public class SequenceFile {
ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
for (int i = 0; i < inNames.length; i++) {
SegmentDescriptor s = new SegmentDescriptor(0,
- fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
+ fc.getFileStatus(inNames[i]).getLen(), inNames[i]);
s.preserveInput(!deleteInputs);
s.doSync();
a.add(s);
}
this.factor = factor;
- MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
+ MergeQueue mQueue = new MergeQueue(a, tmpDir);
return mQueue.merge();
}
@@ -2892,46 +2924,17 @@ public class SequenceFile {
ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
for (int i = 0; i < inNames.length; i++) {
SegmentDescriptor s = new SegmentDescriptor(0,
- fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
+ fc.getFileStatus(inNames[i]).getLen(), inNames[i]);
s.preserveInput(!deleteInputs);
s.doSync();
a.add(s);
}
factor = (inNames.length < factor) ? inNames.length : factor;
- // pass in object to report progress, if present
- MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
+ MergeQueue mQueue = new MergeQueue(a, tempDir);
return mQueue.merge();
}
/**
- * Clones the attributes (like compression of the input file and creates a
- * corresponding Writer
- * @param inputFile the path of the input file whose attributes should be
- * cloned
- * @param outputFile the path of the output file
- * @param prog the Progressable to report status during the file write
- * @return Writer
- * @throws IOException
- */
- public Writer cloneFileAttributes(Path inputFile, Path outputFile,
- Progressable prog) throws IOException {
- Reader reader = new Reader(conf,
- Reader.file(inputFile),
- new Reader.OnlyHeaderOption());
- CompressionType compress = reader.getCompressionType();
- CompressionCodec codec = reader.getCompressionCodec();
- reader.close();
-
- Writer writer = createWriter(conf,
- Writer.file(outputFile),
- Writer.keyClass(keyClass),
- Writer.valueClass(valClass),
- Writer.compression(compress, codec),
- Writer.progressable(prog));
- return writer;
- }
-
- /**
* Writes records from RawKeyValueIterator into a file represented by the
* passed writer
* @param records the RawKeyValueIterator
@@ -2953,12 +2956,17 @@ public class SequenceFile {
* @throws IOException
*/
public void merge(Path[] inFiles, Path outFile) throws IOException {
- if (fs.exists(outFile)) {
+ if (fc.util().exists(outFile)) {
throw new IOException("already exists: " + outFile);
}
+ this.inFiles = inFiles;
+ setCompressionType();
RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
- Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
-
+ Writer writer =
+ createWriter(conf, Options.prependOptions
+ (options,
+ Writer.file(outFile),
+ Writer.compression(compressType, compressCodec)));
writeFile(r, writer);
writer.close();
@@ -2966,11 +2974,11 @@ public class SequenceFile {
/** sort calls this to generate the final merged output */
private int mergePass(Path tmpDir) throws IOException {
- if(LOG.isDebugEnabled()) {
- LOG.debug("running merge pass");
- }
- Writer writer = cloneFileAttributes(
- outFile.suffix(".0"), outFile, null);
+ LOG.debug("running merge pass");
+ Writer writer =
+ createWriter(conf, Options.prependOptions
+ (options, Writer.file(outFile),
+ Writer.compression(compressType, compressCodec)));
RawKeyValueIterator r = merge(outFile.suffix(".0"),
outFile.suffix(".0.index"), tmpDir);
writeFile(r, writer);
@@ -2994,12 +3002,12 @@ public class SequenceFile {
//the contained segments during the merge process & hence don't need
//them anymore
SegmentContainer container = new SegmentContainer(inName, indexIn);
- MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
+ MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir);
return mQueue.merge();
}
/** This class implements the core of the merge logic */
- private class MergeQueue extends PriorityQueue
+ private class MergeQueue extends PriorityQueue<SegmentDescriptor>
implements RawKeyValueIterator {
private boolean compress;
private boolean blockCompress;
@@ -3009,7 +3017,6 @@ public class SequenceFile {
private float progPerByte;
private Progress mergeProgress = new Progress();
private Path tmpDir;
- private Progressable progress = null; //handle to the progress reporting object
private SegmentDescriptor minSegment;
//a TreeMap used to store the segments sorted by size (segment offset and
@@ -3017,8 +3024,7 @@ public class SequenceFile {
private Map<SegmentDescriptor, Void> sortedSegmentSizes =
new TreeMap<SegmentDescriptor, Void>();
- @SuppressWarnings("unchecked")
- public void put(SegmentDescriptor stream) throws IOException {
+ public void addSegment(SegmentDescriptor stream) throws IOException {
if (size() == 0) {
compress = stream.in.isCompressed();
blockCompress = stream.in.isBlockCompressed();
@@ -3026,29 +3032,23 @@ public class SequenceFile {
blockCompress != stream.in.isBlockCompressed()) {
throw new IOException("All merged files must be compressed or not.");
}
- super.put(stream);
+ put(stream);
}
/**
* A queue of file segments to merge
* @param segments the file segments to merge
* @param tmpDir a relative local directory to save intermediate files in
- * @param progress the reference to the Progressable object
*/
public MergeQueue(List <SegmentDescriptor> segments,
- Path tmpDir, Progressable progress) {
+ Path tmpDir) {
int size = segments.size();
for (int i = 0; i < size; i++) {
sortedSegmentSizes.put(segments.get(i), null);
}
this.tmpDir = tmpDir;
- this.progress = progress;
}
protected boolean lessThan(Object a, Object b) {
- // indicate we're making progress
- if (progress != null) {
- progress.progress();
- }
SegmentDescriptor msa = (SegmentDescriptor)a;
SegmentDescriptor msb = (SegmentDescriptor)b;
return comparator.compare(msa.getKey().getData(), 0,
@@ -3167,7 +3167,7 @@ public class SequenceFile {
//feed the streams to the priority queue
initialize(segmentsToMerge.size()); clear();
for (int i = 0; i < segmentsToMerge.size(); i++) {
- put(segmentsToMerge.get(i));
+ addSegment(segmentsToMerge.get(i));
}
//if we have lesser number of segments remaining, then just return the
//iterator, else do another single level merge
@@ -3198,12 +3198,14 @@ public class SequenceFile {
Path outputFile = lDirAlloc.getLocalPathForWrite(
tmpFilename.toString(),
approxOutputSize, conf);
- if(LOG.isDebugEnabled()) {
- LOG.debug("writing intermediate results to " + outputFile);
- }
- Writer writer = cloneFileAttributes(
- fs.makeQualified(segmentsToMerge.get(0).segmentPathName),
- fs.makeQualified(outputFile), null);
+ LOG.debug("writing intermediate results to " + outputFile);
+ Writer writer =
+ createWriter(conf,
+ Options.prependOptions
+ (options,
+ Writer.file(outputFile),
+ Writer.compression(compressType,
+ compressCodec)));
writer.sync = null; //disable sync for temp files
writeFile(this, writer);
writer.close();
@@ -3214,7 +3216,7 @@ public class SequenceFile {
SegmentDescriptor tempSegment =
new SegmentDescriptor(0,
- fs.getFileStatus(outputFile).getLen(), outputFile);
+ fc.getFileStatus(outputFile).getLen(), outputFile);
//put the segment back in the TreeMap
sortedSegmentSizes.put(tempSegment, null);
numSegments = sortedSegmentSizes.size();
@@ -3244,7 +3246,8 @@ public class SequenceFile {
numDescriptors = sortedSegmentSizes.size();
SegmentDescriptor[] SegmentDescriptors =
new SegmentDescriptor[numDescriptors];
- Iterator iter = sortedSegmentSizes.keySet().iterator();
+ Iterator<SegmentDescriptor> iter =
+ sortedSegmentSizes.keySet().iterator();
int i = 0;
while (i < numDescriptors) {
SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
@@ -3258,7 +3261,7 @@ public class SequenceFile {
* provide a customized cleanup method implementation. In this
* implementation, cleanup closes the file handle and deletes the file
*/
- public class SegmentDescriptor implements Comparable {
+ public class SegmentDescriptor implements Comparable<SegmentDescriptor> {
long segmentOffset; //the start of the segment in the file
long segmentLength; //the length of the segment
@@ -3292,8 +3295,8 @@ public class SequenceFile {
return preserveInput;
}
- public int compareTo(Object o) {
- SegmentDescriptor that = (SegmentDescriptor)o;
+ @Override
+ public int compareTo(SegmentDescriptor that) {
if (this.segmentLength != that.segmentLength) {
return (this.segmentLength < that.segmentLength ? -1 : 1);
}
@@ -3329,24 +3332,16 @@ public class SequenceFile {
public boolean nextRawKey() throws IOException {
if (in == null) {
int bufferSize = getBufferSize(conf);
- if (fs.getUri().getScheme().startsWith("ramfs")) {
- bufferSize = conf.getInt("io.bytes.per.checksum", 512);
- }
Reader reader = new Reader(conf,
Reader.file(segmentPathName),
Reader.bufferSize(bufferSize),
Reader.start(segmentOffset),
Reader.length(segmentLength));
+ checkSerialization(reader, segmentPathName);
//sometimes we ignore syncs especially for temp merge files
if (ignoreSync) reader.ignoreSync();
- if (reader.getKeyClass() != keyClass)
- throw new IOException("wrong key class: " + reader.getKeyClass() +
- " is not " + keyClass);
- if (reader.getValueClass() != valClass)
- throw new IOException("wrong value class: "+reader.getValueClass()+
- " is not " + valClass);
this.in = reader;
rawKey = new DataOutputBuffer();
}
@@ -3384,7 +3379,7 @@ public class SequenceFile {
public void cleanup() throws IOException {
close();
if (!preserveInput) {
- fs.delete(segmentPathName, true);
+ fc.delete(segmentPathName, true);
}
}
} // SequenceFile.Sorter.SegmentDescriptor
@@ -3439,8 +3434,8 @@ public class SequenceFile {
* generates a single output file with an associated index file */
public SegmentContainer(Path inName, Path indexIn) throws IOException {
//get the segments from indexIn
- FSDataInputStream fsIndexIn = fs.open(indexIn);
- long end = fs.getFileStatus(indexIn).getLen();
+ FSDataInputStream fsIndexIn = fc.open(indexIn);
+ long end = fc.getFileStatus(indexIn).getLen();
while (fsIndexIn.getPos() < end) {
long segmentOffset = WritableUtils.readVLong(fsIndexIn);
long segmentLength = WritableUtils.readVLong(fsIndexIn);
@@ -3449,7 +3444,7 @@ public class SequenceFile {
segmentLength, segmentName, this));
}
fsIndexIn.close();
- fs.delete(indexIn, true);
+ fc.delete(indexIn, true);
numSegmentsContained = segments.size();
this.inName = inName;
}
@@ -3460,7 +3455,7 @@ public class SequenceFile {
public void cleanup() throws IOException {
numSegmentsCleanedUp++;
[... 7 lines stripped ...]