You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2009/01/01 01:13:06 UTC
svn commit: r730492 [2/3] - in /hadoop/hbase/branches/0.19_on_hadoop_0.18:
./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/ipc/
src/java/org/apache/hado...
Added: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/SequenceFile.java?rev=730492&view=auto
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/SequenceFile.java (added)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/SequenceFile.java Wed Dec 31 16:13:06 2008
@@ -0,0 +1,3213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.*;
+import java.util.*;
+import java.rmi.server.UID;
+import java.security.MessageDigest;
+import org.apache.commons.logging.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableName;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.MergeSort;
+import org.apache.hadoop.util.PriorityQueue;
+
+/**
+ * <code>SequenceFile</code>s are flat files consisting of binary key/value
+ * pairs.
+ *
+ * <p>This is copy of Hadoop SequenceFile brought local so we can fix bugs;
+ * e.g. hbase-1097</p>
+ *
+ * <p><code>SequenceFile</code> provides {@link Writer}, {@link Reader} and
+ * {@link Sorter} classes for writing, reading and sorting respectively.</p>
+ *
+ * There are three <code>SequenceFile</code> <code>Writer</code>s based on the
+ * {@link CompressionType} used to compress key/value pairs:
+ * <ol>
+ * <li>
+ * <code>Writer</code> : Uncompressed records.
+ * </li>
+ * <li>
+ * <code>RecordCompressWriter</code> : Record-compressed files, only compress
+ * values.
+ * </li>
+ * <li>
+ * <code>BlockCompressWriter</code> : Block-compressed files, both keys &
+ * values are collected in 'blocks'
+ * separately and compressed. The size of
+ * the 'block' is configurable.
+ * </ol>
+ *
+ * <p>The actual compression algorithm used to compress key and/or values can be
+ * specified by using the appropriate {@link CompressionCodec}.</p>
+ *
+ * <p>The recommended way is to use the static <tt>createWriter</tt> methods
+ * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
+ *
+ * <p>The {@link Reader} acts as the bridge and can read any of the above
+ * <code>SequenceFile</code> formats.</p>
+ *
+ * <h4 id="Formats">SequenceFile Formats</h4>
+ *
+ * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
+ * depending on the <code>CompressionType</code> specified. All of them share a
+ * <a href="#Header">common header</a> described below.
+ *
+ * <h5 id="Header">SequenceFile Header</h5>
+ * <ul>
+ * <li>
+ * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual
+ * version number (e.g. SEQ4 or SEQ6)
+ * </li>
+ * <li>
+ * keyClassName -key class
+ * </li>
+ * <li>
+ * valueClassName - value class
+ * </li>
+ * <li>
+ * compression - A boolean which specifies if compression is turned on for
+ * keys/values in this file.
+ * </li>
+ * <li>
+ * blockCompression - A boolean which specifies if block-compression is
+ * turned on for keys/values in this file.
+ * </li>
+ * <li>
+ * compression codec - <code>CompressionCodec</code> class which is used for
+ * compression of keys and/or values (if compression is
+ * enabled).
+ * </li>
+ * <li>
+ * metadata - {@link Metadata} for this file.
+ * </li>
+ * <li>
+ * sync - A sync marker to denote end of the header.
+ * </li>
+ * </ul>
+ *
+ * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
+ * <ul>
+ * <li>
+ * <a href="#Header">Header</a>
+ * </li>
+ * <li>
+ * Record
+ * <ul>
+ * <li>Record length</li>
+ * <li>Key length</li>
+ * <li>Key</li>
+ * <li>Value</li>
+ * </ul>
+ * </li>
+ * <li>
+ * A sync-marker every few <code>100</code> bytes or so.
+ * </li>
+ * </ul>
+ *
+ * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
+ * <ul>
+ * <li>
+ * <a href="#Header">Header</a>
+ * </li>
+ * <li>
+ * Record
+ * <ul>
+ * <li>Record length</li>
+ * <li>Key length</li>
+ * <li>Key</li>
+ * <li><i>Compressed</i> Value</li>
+ * </ul>
+ * </li>
+ * <li>
+ * A sync-marker every few <code>100</code> bytes or so.
+ * </li>
+ * </ul>
+ *
+ * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
+ * <ul>
+ * <li>
+ * <a href="#Header">Header</a>
+ * </li>
+ * <li>
+ * Record <i>Block</i>
+ * <ul>
+ * <li>Compressed key-lengths block-size</li>
+ * <li>Compressed key-lengths block</li>
+ * <li>Compressed keys block-size</li>
+ * <li>Compressed keys block</li>
+ * <li>Compressed value-lengths block-size</li>
+ * <li>Compressed value-lengths block</li>
+ * <li>Compressed values block-size</li>
+ * <li>Compressed values block</li>
+ * </ul>
+ * </li>
+ * <li>
+ * A sync-marker every few <code>100</code> bytes or so.
+ * </li>
+ * </ul>
+ *
+ * <p>The compressed blocks of key lengths and value lengths consist of the
+ * actual lengths of individual keys/values encoded in ZeroCompressedInteger
+ * format.</p>
+ *
+ * @see CompressionCodec
+ */
+public class SequenceFile {
+ private static final Log LOG = LogFactory.getLog(SequenceFile.class);
+
+ private SequenceFile() {} // no public ctor
+
+ private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
+ private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
+ private static final byte VERSION_WITH_METADATA = (byte)6;
+ private static byte[] VERSION = new byte[] {
+ (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
+ };
+
+ private static final int SYNC_ESCAPE = -1; // "length" of sync entries
+ private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
+ private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
+
+ /** The number of bytes between sync points.*/
+ public static final int SYNC_INTERVAL = 100*SYNC_SIZE;
+
+ /**
+ * The compression type used to compress key/value pairs in the
+ * {@link SequenceFile}.
+ *
+ * @see SequenceFile.Writer
+ */
+ public static enum CompressionType {
+ /** Do not compress records. */
+ NONE,
+ /** Compress values only, each separately. */
+ RECORD,
+ /** Compress sequences of records together in blocks. */
+ BLOCK
+ }
+
+ /**
+ * Get the compression type for the reduce outputs
+ * @param job the job config to look in
+ * @return the kind of compression to use
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapred.SequenceFileOutputFormat#getOutputCompressionType(org.apache.hadoop.mapred.JobConf)}
+ * to get {@link CompressionType} for job-outputs.
+ */
+ @Deprecated
+ static public CompressionType getCompressionType(Configuration job) {
+ String name = job.get("io.seqfile.compression.type");
+ return name == null ? CompressionType.RECORD :
+ CompressionType.valueOf(name);
+ }
+
+ /**
+ * Set the compression type for sequence files.
+ * @param job the configuration to modify
+ * @param val the new compression type (none, block, record)
+ * @deprecated Use the one of the many SequenceFile.createWriter methods to specify
+ * the {@link CompressionType} while creating the {@link SequenceFile} or
+ * {@link org.apache.hadoop.mapred.SequenceFileOutputFormat#setOutputCompressionType(org.apache.hadoop.mapred.JobConf, org.apache.hadoop.io.SequenceFile.CompressionType)}
+ * to specify the {@link CompressionType} for job-outputs.
+ * or
+ */
+ @Deprecated
+ static public void setCompressionType(Configuration job,
+ CompressionType val) {
+ job.set("io.seqfile.compression.type", val.toString());
+ }
+
+ /**
+ * Construct the preferred type of SequenceFile Writer.
+ * @param fs The configured filesystem.
+ * @param conf The configuration.
+ * @param name The name of the file.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass)
+ throws IOException {
+ return createWriter(fs, conf, name, keyClass, valClass,
+ getCompressionType(conf));
+ }
+
+ /**
+ * Construct the preferred type of SequenceFile Writer.
+ * @param fs The configured filesystem.
+ * @param conf The configuration.
+ * @param name The name of the file.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compressionType The compression type.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass, CompressionType compressionType)
+ throws IOException {
+ return createWriter(fs, conf, name, keyClass, valClass,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ fs.getDefaultReplication(), fs.getDefaultBlockSize(),
+ compressionType, new DefaultCodec(), null, new Metadata());
+ }
+
+ /**
+ * Construct the preferred type of SequenceFile Writer.
+ * @param fs The configured filesystem.
+ * @param conf The configuration.
+ * @param name The name of the file.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compressionType The compression type.
+ * @param progress The Progressable object to track progress.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass, CompressionType compressionType,
+ Progressable progress) throws IOException {
+ return createWriter(fs, conf, name, keyClass, valClass,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ fs.getDefaultReplication(), fs.getDefaultBlockSize(),
+ compressionType, new DefaultCodec(), progress, new Metadata());
+ }
+
+ /**
+ * Construct the preferred type of SequenceFile Writer.
+ * @param fs The configured filesystem.
+ * @param conf The configuration.
+ * @param name The name of the file.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compressionType The compression type.
+ * @param codec The compression codec.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass,
+ CompressionType compressionType, CompressionCodec codec)
+ throws IOException {
+ return createWriter(fs, conf, name, keyClass, valClass,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ fs.getDefaultReplication(), fs.getDefaultBlockSize(),
+ compressionType, codec, null, new Metadata());
+ }
+
+ /**
+ * Construct the preferred type of SequenceFile Writer.
+ * @param fs The configured filesystem.
+ * @param conf The configuration.
+ * @param name The name of the file.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compressionType The compression type.
+ * @param codec The compression codec.
+ * @param progress The Progressable object to track progress.
+ * @param metadata The metadata of the file.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass,
+ CompressionType compressionType, CompressionCodec codec,
+ Progressable progress, Metadata metadata) throws IOException {
+ return createWriter(fs, conf, name, keyClass, valClass,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ fs.getDefaultReplication(), fs.getDefaultBlockSize(),
+ compressionType, codec, progress, metadata);
+ }
+
+ /**
+ * Construct the preferred type of SequenceFile Writer.
+ * @param fs The configured filesystem.
+ * @param conf The configuration.
+ * @param name The name of the file.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param bufferSize buffer size for the underlaying outputstream.
+ * @param replication replication factor for the file.
+ * @param blockSize block size for the file.
+ * @param compressionType The compression type.
+ * @param codec The compression codec.
+ * @param progress The Progressable object to track progress.
+ * @param metadata The metadata of the file.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass, int bufferSize,
+ short replication, long blockSize,
+ CompressionType compressionType, CompressionCodec codec,
+ Progressable progress, Metadata metadata) throws IOException {
+ if ((codec instanceof GzipCodec) &&
+ !NativeCodeLoader.isNativeCodeLoaded() &&
+ !ZlibFactory.isNativeZlibLoaded(conf)) {
+ throw new IllegalArgumentException("SequenceFile doesn't work with " +
+ "GzipCodec without native-hadoop code!");
+ }
+
+ Writer writer = null;
+
+ if (compressionType == CompressionType.NONE) {
+ writer = new Writer(fs, conf, name, keyClass, valClass,
+ bufferSize, replication, blockSize,
+ progress, metadata);
+ } else if (compressionType == CompressionType.RECORD) {
+ writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass,
+ bufferSize, replication, blockSize,
+ codec, progress, metadata);
+ } else if (compressionType == CompressionType.BLOCK){
+ writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass,
+ bufferSize, replication, blockSize,
+ codec, progress, metadata);
+ }
+
+ return writer;
+ }
+
+ /**
+ * Construct the preferred type of SequenceFile Writer.
+ * @param fs The configured filesystem.
+ * @param conf The configuration.
+ * @param name The name of the file.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compressionType The compression type.
+ * @param codec The compression codec.
+ * @param progress The Progressable object to track progress.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass,
+ CompressionType compressionType, CompressionCodec codec,
+ Progressable progress) throws IOException {
+ Writer writer = createWriter(fs, conf, name, keyClass, valClass,
+ compressionType, codec, progress, new Metadata());
+ return writer;
+ }
+
+ /**
+ * Construct the preferred type of 'raw' SequenceFile Writer.
+ * @param out The stream on top which the writer is to be constructed.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compress Compress data?
+ * @param blockCompress Compress blocks?
+ * @param metadata The metadata of the file.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ private static Writer
+ createWriter(Configuration conf, FSDataOutputStream out,
+ Class keyClass, Class valClass, boolean compress, boolean blockCompress,
+ CompressionCodec codec, Metadata metadata)
+ throws IOException {
+ if (codec != null && (codec instanceof GzipCodec) &&
+ !NativeCodeLoader.isNativeCodeLoaded() &&
+ !ZlibFactory.isNativeZlibLoaded(conf)) {
+ throw new IllegalArgumentException("SequenceFile doesn't work with " +
+ "GzipCodec without native-hadoop code!");
+ }
+
+ Writer writer = null;
+
+ if (!compress) {
+ writer = new Writer(conf, out, keyClass, valClass, metadata);
+ } else if (compress && !blockCompress) {
+ writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
+ } else {
+ writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
+ }
+
+ return writer;
+ }
+
+ /**
+ * Construct the preferred type of 'raw' SequenceFile Writer.
+ * @param fs The configured filesystem.
+ * @param conf The configuration.
+ * @param file The name of the file.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compress Compress data?
+ * @param blockCompress Compress blocks?
+ * @param codec The compression codec.
+ * @param progress
+ * @param metadata The metadata of the file.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ private static Writer
+ createWriter(FileSystem fs, Configuration conf, Path file,
+ Class keyClass, Class valClass,
+ boolean compress, boolean blockCompress,
+ CompressionCodec codec, Progressable progress, Metadata metadata)
+ throws IOException {
+ if (codec != null && (codec instanceof GzipCodec) &&
+ !NativeCodeLoader.isNativeCodeLoaded() &&
+ !ZlibFactory.isNativeZlibLoaded(conf)) {
+ throw new IllegalArgumentException("SequenceFile doesn't work with " +
+ "GzipCodec without native-hadoop code!");
+ }
+
+ Writer writer = null;
+
+ if (!compress) {
+ writer = new Writer(fs, conf, file, keyClass, valClass, progress, metadata);
+ } else if (compress && !blockCompress) {
+ writer = new RecordCompressWriter(fs, conf, file, keyClass, valClass,
+ codec, progress, metadata);
+ } else {
+ writer = new BlockCompressWriter(fs, conf, file, keyClass, valClass,
+ codec, progress, metadata);
+ }
+
+ return writer;
+}
+
+ /**
+ * Construct the preferred type of 'raw' SequenceFile Writer.
+ * @param conf The configuration.
+ * @param out The stream on top which the writer is to be constructed.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compressionType The compression type.
+ * @param codec The compression codec.
+ * @param metadata The metadata of the file.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(Configuration conf, FSDataOutputStream out,
+ Class keyClass, Class valClass, CompressionType compressionType,
+ CompressionCodec codec, Metadata metadata)
+ throws IOException {
+ if ((codec instanceof GzipCodec) &&
+ !NativeCodeLoader.isNativeCodeLoaded() &&
+ !ZlibFactory.isNativeZlibLoaded(conf)) {
+ throw new IllegalArgumentException("SequenceFile doesn't work with " +
+ "GzipCodec without native-hadoop code!");
+ }
+
+ Writer writer = null;
+
+ if (compressionType == CompressionType.NONE) {
+ writer = new Writer(conf, out, keyClass, valClass, metadata);
+ } else if (compressionType == CompressionType.RECORD) {
+ writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
+ } else if (compressionType == CompressionType.BLOCK){
+ writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
+ }
+
+ return writer;
+ }
+
+ /**
+ * Construct the preferred type of 'raw' SequenceFile Writer.
+ * @param conf The configuration.
+ * @param out The stream on top which the writer is to be constructed.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compressionType The compression type.
+ * @param codec The compression codec.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(Configuration conf, FSDataOutputStream out,
+ Class keyClass, Class valClass, CompressionType compressionType,
+ CompressionCodec codec)
+ throws IOException {
+ Writer writer = createWriter(conf, out, keyClass, valClass, compressionType,
+ codec, new Metadata());
+ return writer;
+ }
+
+
+ /** The interface to 'raw' values of SequenceFiles. */
+ public static interface ValueBytes {
+
+ /** Writes the uncompressed bytes to the outStream.
+ * @param outStream : Stream to write uncompressed bytes into.
+ * @throws IOException
+ */
+ public void writeUncompressedBytes(DataOutputStream outStream)
+ throws IOException;
+
+ /** Write compressed bytes to outStream.
+ * Note: that it will NOT compress the bytes if they are not compressed.
+ * @param outStream : Stream to write compressed bytes into.
+ */
+ public void writeCompressedBytes(DataOutputStream outStream)
+ throws IllegalArgumentException, IOException;
+
+ /**
+ * Size of stored data.
+ */
+ public int getSize();
+ }
+
+ private static class UncompressedBytes implements ValueBytes {
+ private int dataSize;
+ private byte[] data;
+
+ private UncompressedBytes() {
+ data = null;
+ dataSize = 0;
+ }
+
+ private void reset(DataInputStream in, int length) throws IOException {
+ data = new byte[length];
+ dataSize = -1;
+
+ in.readFully(data);
+ dataSize = data.length;
+ }
+
+ public int getSize() {
+ return dataSize;
+ }
+
+ public void writeUncompressedBytes(DataOutputStream outStream)
+ throws IOException {
+ outStream.write(data, 0, dataSize);
+ }
+
+ public void writeCompressedBytes(DataOutputStream outStream)
+ throws IllegalArgumentException, IOException {
+ throw
+ new IllegalArgumentException("UncompressedBytes cannot be compressed!");
+ }
+
+ } // UncompressedBytes
+
+ private static class CompressedBytes implements ValueBytes {
+ private int dataSize;
+ private byte[] data;
+ DataInputBuffer rawData = null;
+ CompressionCodec codec = null;
+ CompressionInputStream decompressedStream = null;
+
+ private CompressedBytes(CompressionCodec codec) {
+ data = null;
+ dataSize = 0;
+ this.codec = codec;
+ }
+
+ private void reset(DataInputStream in, int length) throws IOException {
+ data = new byte[length];
+ dataSize = -1;
+
+ in.readFully(data);
+ dataSize = data.length;
+ }
+
+ public int getSize() {
+ return dataSize;
+ }
+
+ public void writeUncompressedBytes(DataOutputStream outStream)
+ throws IOException {
+ if (decompressedStream == null) {
+ rawData = new DataInputBuffer();
+ decompressedStream = codec.createInputStream(rawData);
+ } else {
+ decompressedStream.resetState();
+ }
+ rawData.reset(data, 0, dataSize);
+
+ byte[] buffer = new byte[8192];
+ int bytesRead = 0;
+ while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
+ outStream.write(buffer, 0, bytesRead);
+ }
+ }
+
+ public void writeCompressedBytes(DataOutputStream outStream)
+ throws IllegalArgumentException, IOException {
+ outStream.write(data, 0, dataSize);
+ }
+
+ } // CompressedBytes
+
+ /**
+ * The class encapsulating with the metadata of a file.
+ * The metadata of a file is a list of attribute name/value
+ * pairs of Text type.
+ *
+ */
+ public static class Metadata implements Writable {
+
+ private TreeMap<Text, Text> theMetadata;
+
+ public Metadata() {
+ this(new TreeMap<Text, Text>());
+ }
+
+ public Metadata(TreeMap<Text, Text> arg) {
+ if (arg == null) {
+ this.theMetadata = new TreeMap<Text, Text>();
+ } else {
+ this.theMetadata = arg;
+ }
+ }
+
+ public Text get(Text name) {
+ return this.theMetadata.get(name);
+ }
+
+ public void set(Text name, Text value) {
+ this.theMetadata.put(name, value);
+ }
+
+ public TreeMap<Text, Text> getMetadata() {
+ return new TreeMap<Text, Text>(this.theMetadata);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(this.theMetadata.size());
+ Iterator<Map.Entry<Text, Text>> iter =
+ this.theMetadata.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<Text, Text> en = iter.next();
+ en.getKey().write(out);
+ en.getValue().write(out);
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ int sz = in.readInt();
+ if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
+ this.theMetadata = new TreeMap<Text, Text>();
+ for (int i = 0; i < sz; i++) {
+ Text key = new Text();
+ Text val = new Text();
+ key.readFields(in);
+ val.readFields(in);
+ this.theMetadata.put(key, val);
+ }
+ }
+
+ public boolean equals(Metadata other) {
+ if (other == null) return false;
+ if (this.theMetadata.size() != other.theMetadata.size()) {
+ return false;
+ }
+ Iterator<Map.Entry<Text, Text>> iter1 =
+ this.theMetadata.entrySet().iterator();
+ Iterator<Map.Entry<Text, Text>> iter2 =
+ other.theMetadata.entrySet().iterator();
+ while (iter1.hasNext() && iter2.hasNext()) {
+ Map.Entry<Text, Text> en1 = iter1.next();
+ Map.Entry<Text, Text> en2 = iter2.next();
+ if (!en1.getKey().equals(en2.getKey())) {
+ return false;
+ }
+ if (!en1.getValue().equals(en2.getValue())) {
+ return false;
+ }
+ }
+ if (iter1.hasNext() || iter2.hasNext()) {
+ return false;
+ }
+ return true;
+ }
+
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return 42; // any arbitrary constant will do
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("size: ").append(this.theMetadata.size()).append("\n");
+ Iterator<Map.Entry<Text, Text>> iter =
+ this.theMetadata.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<Text, Text> en = iter.next();
+ sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+ }
+
+ /** Write key/value pairs to a sequence-format file. */
+ public static class Writer implements java.io.Closeable {
+ Configuration conf;
+ FSDataOutputStream out;
+ boolean ownOutputStream = true;
+ DataOutputBuffer buffer = new DataOutputBuffer();
+
+ Class keyClass;
+ Class valClass;
+
+ private boolean compress;
+ CompressionCodec codec = null;
+ CompressionOutputStream deflateFilter = null;
+ DataOutputStream deflateOut = null;
+ Metadata metadata = null;
+ Compressor compressor = null;
+
+ protected Serializer keySerializer;
+ protected Serializer uncompressedValSerializer;
+ protected Serializer compressedValSerializer;
+
+ // Insert a globally unique 16-byte value every few entries, so that one
+ // can seek into the middle of a file and then synchronize with record
+ // starts and ends by scanning for this value.
+ long lastSyncPos; // position of last sync
+ byte[] sync; // 16 random bytes
+ {
+ try {
+ MessageDigest digester = MessageDigest.getInstance("MD5");
+ long time = System.currentTimeMillis();
+ digester.update((new UID()+"@"+time).getBytes());
+ sync = digester.digest();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Implicit constructor: needed for the period of transition!*/
+ Writer()
+ {}
+
+ /** Create the named file. */
+ public Writer(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass)
+ throws IOException {
+ this(fs, conf, name, keyClass, valClass, null, new Metadata());
+ }
+
+ /** Create the named file with write-progress reporter. */
+ public Writer(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass,
+ Progressable progress, Metadata metadata)
+ throws IOException {
+ this(fs, conf, name, keyClass, valClass,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ fs.getDefaultReplication(), fs.getDefaultBlockSize(),
+ progress, metadata);
+ }
+
+ /** Create the named file with write-progress reporter. */
+ public Writer(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass,
+ int bufferSize, short replication, long blockSize,
+ Progressable progress, Metadata metadata)
+ throws IOException {
+ init(name, conf,
+ fs.create(name, true, bufferSize, replication, blockSize, progress),
+ keyClass, valClass, false, null, metadata);
+ initializeFileHeader();
+ writeFileHeader();
+ finalizeFileHeader();
+ }
+
+ /** Write to an arbitrary stream using a specified buffer size. */
+ private Writer(Configuration conf, FSDataOutputStream out,
+ Class keyClass, Class valClass, Metadata metadata)
+ throws IOException {
+ this.ownOutputStream = false;
+ init(null, conf, out, keyClass, valClass, false, null, metadata);
+
+ initializeFileHeader();
+ writeFileHeader();
+ finalizeFileHeader();
+ }
+
+ /** Write the initial part of file header. */
+ void initializeFileHeader()
+ throws IOException{
+ out.write(VERSION);
+ }
+
+ /** Write the final part of file header. */
+ void finalizeFileHeader()
+ throws IOException{
+ out.write(sync); // write the sync bytes
+ out.flush(); // flush header
+ }
+
+ boolean isCompressed() { return compress; }
+ boolean isBlockCompressed() { return false; }
+
+ /** Write and flush the file header. */
+ void writeFileHeader()
+ throws IOException {
+ Text.writeString(out, keyClass.getName());
+ Text.writeString(out, valClass.getName());
+
+ out.writeBoolean(this.isCompressed());
+ out.writeBoolean(this.isBlockCompressed());
+
+ if (this.isCompressed()) {
+ Text.writeString(out, (codec.getClass()).getName());
+ }
+ this.metadata.write(out);
+ }
+
+ /** Initialize. */
+ @SuppressWarnings("unchecked")
+ void init(Path name, Configuration conf, FSDataOutputStream out,
+ Class keyClass, Class valClass,
+ boolean compress, CompressionCodec codec, Metadata metadata)
+ throws IOException {
+ this.conf = conf;
+ this.out = out;
+ this.keyClass = keyClass;
+ this.valClass = valClass;
+ this.compress = compress;
+ this.codec = codec;
+ this.metadata = metadata;
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+ this.keySerializer = serializationFactory.getSerializer(keyClass);
+ this.keySerializer.open(buffer);
+ this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
+ this.uncompressedValSerializer.open(buffer);
+ if (this.codec != null) {
+ ReflectionUtils.setConf(this.codec, this.conf);
+ this.compressor = CodecPool.getCompressor(this.codec);
+ this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
+ this.deflateOut =
+ new DataOutputStream(new BufferedOutputStream(deflateFilter));
+ this.compressedValSerializer = serializationFactory.getSerializer(valClass);
+ this.compressedValSerializer.open(deflateOut);
+ }
+ }
+
+ /** Returns the class of keys in this file. */
+ public Class getKeyClass() { return keyClass; }
+
+ /** Returns the class of values in this file. */
+ public Class getValueClass() { return valClass; }
+
+ /** Returns the compression codec of data in this file. */
+ public CompressionCodec getCompressionCodec() { return codec; }
+
+ /** create a sync point */
+ public void sync() throws IOException {
+ if (sync != null && lastSyncPos != out.getPos()) {
+ out.writeInt(SYNC_ESCAPE); // mark the start of the sync
+ out.write(sync); // write sync
+ lastSyncPos = out.getPos(); // update lastSyncPos
+ }
+ }
+
+ /** Returns the configuration of this file. */
+ Configuration getConf() { return conf; }
+
+ /** Close the file. */
+ public synchronized void close() throws IOException {
+ keySerializer.close();
+ uncompressedValSerializer.close();
+ if (compressedValSerializer != null) {
+ compressedValSerializer.close();
+ }
+
+ CodecPool.returnCompressor(compressor);
+ compressor = null;
+
+ if (out != null) {
+
+ // Close the underlying stream iff we own it...
+ if (ownOutputStream) {
+ out.close();
+ } else {
+ out.flush();
+ }
+ out = null;
+ }
+ }
+
+ synchronized void checkAndWriteSync() throws IOException {
+ if (sync != null &&
+ out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
+ sync();
+ }
+ }
+
+ /** Append a key/value pair. */
+ public synchronized void append(Writable key, Writable val)
+ throws IOException {
+ append((Object) key, (Object) val);
+ }
+
+ /** Append a key/value pair. */
+ @SuppressWarnings("unchecked")
+ public synchronized void append(Object key, Object val)
+ throws IOException {
+ if (key.getClass() != keyClass)
+ throw new IOException("wrong key class: "+key.getClass().getName()
+ +" is not "+keyClass);
+ if (val.getClass() != valClass)
+ throw new IOException("wrong value class: "+val.getClass().getName()
+ +" is not "+valClass);
+
+ buffer.reset();
+
+ // Append the 'key'
+ keySerializer.serialize(key);
+ int keyLength = buffer.getLength();
+ if (keyLength < 0)
+ throw new IOException("negative length keys not allowed: " + key);
+
+ // Append the 'value'
+ if (compress) {
+ deflateFilter.resetState();
+ compressedValSerializer.serialize(val);
+ deflateOut.flush();
+ deflateFilter.finish();
+ } else {
+ uncompressedValSerializer.serialize(val);
+ }
+
+ // Write the record out
+ checkAndWriteSync(); // sync
+ out.writeInt(buffer.getLength()); // total record length
+ out.writeInt(keyLength); // key portion length
+ out.write(buffer.getData(), 0, buffer.getLength()); // data
+ }
+
+ public synchronized void appendRaw(byte[] keyData, int keyOffset,
+ int keyLength, ValueBytes val) throws IOException {
+ if (keyLength < 0)
+ throw new IOException("negative length keys not allowed: " + keyLength);
+
+ int valLength = val.getSize();
+
+ checkAndWriteSync();
+
+ out.writeInt(keyLength+valLength); // total record length
+ out.writeInt(keyLength); // key portion length
+ out.write(keyData, keyOffset, keyLength); // key
+ val.writeUncompressedBytes(out); // value
+ }
+
+ /** Returns the current length of the output file.
+ *
+ * <p>This always returns a synchronized position. In other words,
+ * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
+ * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However
+ * the key may be earlier in the file than key last written when this
+ * method was called (e.g., with block-compression, it may be the first key
+ * in the block that was being written when this method was called).
+ */
+ public synchronized long getLength() throws IOException {
+ return out.getPos();
+ }
+
+ } // class Writer
+
+ /** Write key/compressed-value pairs to a sequence-format file. */
+ static class RecordCompressWriter extends Writer {
+
+ /** Create the named file. */
+ public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass, CompressionCodec codec)
+ throws IOException {
+ this(conf, fs.create(name), keyClass, valClass, codec, new Metadata());
+ }
+
+ /** Create the named file with write-progress reporter. */
+ public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass, CompressionCodec codec,
+ Progressable progress, Metadata metadata)
+ throws IOException {
+ this(fs, conf, name, keyClass, valClass,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
+ progress, metadata);
+ }
+
+ /** Create the named file with write-progress reporter. */
+ public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass,
+ int bufferSize, short replication, long blockSize,
+ CompressionCodec codec,
+ Progressable progress, Metadata metadata)
+ throws IOException {
+ super.init(name, conf,
+ fs.create(name, true, bufferSize, replication, blockSize, progress),
+ keyClass, valClass, true, codec, metadata);
+
+ initializeFileHeader();
+ writeFileHeader();
+ finalizeFileHeader();
+ }
+
+ /** Create the named file with write-progress reporter. */
+ public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass, CompressionCodec codec,
+ Progressable progress)
+ throws IOException {
+ this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
+ }
+
+ /** Write to an arbitrary stream using a specified buffer size. */
+ private RecordCompressWriter(Configuration conf, FSDataOutputStream out,
+ Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
+ throws IOException {
+ this.ownOutputStream = false;
+ super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
+
+ initializeFileHeader();
+ writeFileHeader();
+ finalizeFileHeader();
+
+ }
+
+ boolean isCompressed() { return true; }
+ boolean isBlockCompressed() { return false; }
+
+ /** Append a key/value pair. */
+ @SuppressWarnings("unchecked")
+ public synchronized void append(Object key, Object val)
+ throws IOException {
+ if (key.getClass() != keyClass)
+ throw new IOException("wrong key class: "+key.getClass().getName()
+ +" is not "+keyClass);
+ if (val.getClass() != valClass)
+ throw new IOException("wrong value class: "+val.getClass().getName()
+ +" is not "+valClass);
+
+ buffer.reset();
+
+ // Append the 'key'
+ keySerializer.serialize(key);
+ int keyLength = buffer.getLength();
+ if (keyLength < 0)
+ throw new IOException("negative length keys not allowed: " + key);
+
+ // Compress 'value' and append it
+ deflateFilter.resetState();
+ compressedValSerializer.serialize(val);
+ deflateOut.flush();
+ deflateFilter.finish();
+
+ // Write the record out
+ checkAndWriteSync(); // sync
+ out.writeInt(buffer.getLength()); // total record length
+ out.writeInt(keyLength); // key portion length
+ out.write(buffer.getData(), 0, buffer.getLength()); // data
+ }
+
+ /** Append a key/value pair. */
+ public synchronized void appendRaw(byte[] keyData, int keyOffset,
+ int keyLength, ValueBytes val) throws IOException {
+
+ if (keyLength < 0)
+ throw new IOException("negative length keys not allowed: " + keyLength);
+
+ int valLength = val.getSize();
+
+ checkAndWriteSync(); // sync
+ out.writeInt(keyLength+valLength); // total record length
+ out.writeInt(keyLength); // key portion length
+ out.write(keyData, keyOffset, keyLength); // 'key' data
+ val.writeCompressedBytes(out); // 'value' data
+ }
+
+ } // RecordCompressionWriter
+
+ /** Write compressed key/value blocks to a sequence-format file. */
+ static class BlockCompressWriter extends Writer {
+
+ private int noBufferedRecords = 0;
+
+ private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
+ private DataOutputBuffer keyBuffer = new DataOutputBuffer();
+
+ private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
+ private DataOutputBuffer valBuffer = new DataOutputBuffer();
+
+ private int compressionBlockSize;
+
+ /** Create the named file. */
+ public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass, CompressionCodec codec)
+ throws IOException {
+ this(fs, conf, name, keyClass, valClass,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
+ null, new Metadata());
+ }
+
+ /** Create the named file with write-progress reporter. */
+ public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass, CompressionCodec codec,
+ Progressable progress, Metadata metadata)
+ throws IOException {
+ this(fs, conf, name, keyClass, valClass,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
+ progress, metadata);
+ }
+
+ /** Create the named file with write-progress reporter. */
+ public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass,
+ int bufferSize, short replication, long blockSize,
+ CompressionCodec codec,
+ Progressable progress, Metadata metadata)
+ throws IOException {
+ super.init(name, conf,
+ fs.create(name, true, bufferSize, replication, blockSize, progress),
+ keyClass, valClass, true, codec, metadata);
+ init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
+
+ initializeFileHeader();
+ writeFileHeader();
+ finalizeFileHeader();
+ }
+
+ /** Create the named file with write-progress reporter. */
+ public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass, CompressionCodec codec,
+ Progressable progress)
+ throws IOException {
+ this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
+ }
+
+ /** Write to an arbitrary stream using a specified buffer size. */
+ private BlockCompressWriter(Configuration conf, FSDataOutputStream out,
+ Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
+ throws IOException {
+ this.ownOutputStream = false;
+ super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
+ init(1000000);
+
+ initializeFileHeader();
+ writeFileHeader();
+ finalizeFileHeader();
+ }
+
+ boolean isCompressed() { return true; }
+ boolean isBlockCompressed() { return true; }
+
+ /** Initialize */
+ void init(int compressionBlockSize) throws IOException {
+ this.compressionBlockSize = compressionBlockSize;
+ keySerializer.close();
+ keySerializer.open(keyBuffer);
+ uncompressedValSerializer.close();
+ uncompressedValSerializer.open(valBuffer);
+ }
+
+ /** Workhorse to check and write out compressed data/lengths */
+ private synchronized
+ void writeBuffer(DataOutputBuffer uncompressedDataBuffer)
+ throws IOException {
+ deflateFilter.resetState();
+ buffer.reset();
+ deflateOut.write(uncompressedDataBuffer.getData(), 0,
+ uncompressedDataBuffer.getLength());
+ deflateOut.flush();
+ deflateFilter.finish();
+
+ WritableUtils.writeVInt(out, buffer.getLength());
+ out.write(buffer.getData(), 0, buffer.getLength());
+ }
+
+ /** Compress and flush contents to dfs */
+ public synchronized void sync() throws IOException {
+ if (noBufferedRecords > 0) {
+ super.sync();
+
+ // No. of records
+ WritableUtils.writeVInt(out, noBufferedRecords);
+
+ // Write 'keys' and lengths
+ writeBuffer(keyLenBuffer);
+ writeBuffer(keyBuffer);
+
+ // Write 'values' and lengths
+ writeBuffer(valLenBuffer);
+ writeBuffer(valBuffer);
+
+ // Flush the file-stream
+ out.flush();
+
+ // Reset internal states
+ keyLenBuffer.reset();
+ keyBuffer.reset();
+ valLenBuffer.reset();
+ valBuffer.reset();
+ noBufferedRecords = 0;
+ }
+
+ }
+
+ /** Close the file. */
+ public synchronized void close() throws IOException {
+ if (out != null) {
+ sync();
+ }
+ super.close();
+ }
+
+ /** Append a key/value pair. */
+ @SuppressWarnings("unchecked")
+ public synchronized void append(Object key, Object val)
+ throws IOException {
+ if (key.getClass() != keyClass)
+ throw new IOException("wrong key class: "+key+" is not "+keyClass);
+ if (val.getClass() != valClass)
+ throw new IOException("wrong value class: "+val+" is not "+valClass);
+
+ // Save key/value into respective buffers
+ int oldKeyLength = keyBuffer.getLength();
+ keySerializer.serialize(key);
+ int keyLength = keyBuffer.getLength() - oldKeyLength;
+ if (keyLength < 0)
+ throw new IOException("negative length keys not allowed: " + key);
+ WritableUtils.writeVInt(keyLenBuffer, keyLength);
+
+ int oldValLength = valBuffer.getLength();
+ uncompressedValSerializer.serialize(val);
+ int valLength = valBuffer.getLength() - oldValLength;
+ WritableUtils.writeVInt(valLenBuffer, valLength);
+
+ // Added another key/value pair
+ ++noBufferedRecords;
+
+ // Compress and flush?
+ int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
+ if (currentBlockSize >= compressionBlockSize) {
+ sync();
+ }
+ }
+
+ /** Append a key/value pair. */
+ public synchronized void appendRaw(byte[] keyData, int keyOffset,
+ int keyLength, ValueBytes val) throws IOException {
+
+ if (keyLength < 0)
+ throw new IOException("negative length keys not allowed");
+
+ int valLength = val.getSize();
+
+ // Save key/value data in relevant buffers
+ WritableUtils.writeVInt(keyLenBuffer, keyLength);
+ keyBuffer.write(keyData, keyOffset, keyLength);
+ WritableUtils.writeVInt(valLenBuffer, valLength);
+ val.writeUncompressedBytes(valBuffer);
+
+ // Added another key/value pair
+ ++noBufferedRecords;
+
+ // Compress and flush?
+ int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
+ if (currentBlockSize >= compressionBlockSize) {
+ sync();
+ }
+ }
+
+ } // BlockCompressionWriter
+
+ /** Reads key/value pairs from a sequence-format file. */
+ public static class Reader implements java.io.Closeable {
+ private Path file;
+ private FSDataInputStream in;
+ private DataOutputBuffer outBuf = new DataOutputBuffer(32);
+
+ private byte version;
+
+ private String keyClassName;
+ private String valClassName;
+ private Class keyClass;
+ private Class valClass;
+
+ private CompressionCodec codec = null;
+ private Metadata metadata = null;
+
+ private byte[] sync = new byte[SYNC_HASH_SIZE];
+ private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
+ private boolean syncSeen;
+
+ private long end;
+ private int keyLength;
+ private int recordLength;
+
+ private boolean decompress;
+ private boolean blockCompressed;
+
+ private Configuration conf;
+
+ private int noBufferedRecords = 0;
+ private boolean lazyDecompress = true;
+ private boolean valuesDecompressed = true;
+
+ private int noBufferedKeys = 0;
+ private int noBufferedValues = 0;
+
+ private DataInputBuffer keyLenBuffer = null;
+ private CompressionInputStream keyLenInFilter = null;
+ private DataInputStream keyLenIn = null;
+ private Decompressor keyLenDecompressor = null;
+ private DataInputBuffer keyBuffer = null;
+ private CompressionInputStream keyInFilter = null;
+ private DataInputStream keyIn = null;
+ private Decompressor keyDecompressor = null;
+
+ private DataInputBuffer valLenBuffer = null;
+ private CompressionInputStream valLenInFilter = null;
+ private DataInputStream valLenIn = null;
+ private Decompressor valLenDecompressor = null;
+ private DataInputBuffer valBuffer = null;
+ private CompressionInputStream valInFilter = null;
+ private DataInputStream valIn = null;
+ private Decompressor valDecompressor = null;
+
+ private Deserializer keyDeserializer;
+ private Deserializer valDeserializer;
+
+ /** Open the named file. */
+ public Reader(FileSystem fs, Path file, Configuration conf)
+ throws IOException {
+ this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
+ }
+
+ private Reader(FileSystem fs, Path file, int bufferSize,
+ Configuration conf, boolean tempReader) throws IOException {
+ this(fs, file, bufferSize, 0, fs.getLength(file), conf, tempReader);
+ }
+
+ private Reader(FileSystem fs, Path file, int bufferSize, long start,
+ long length, Configuration conf, boolean tempReader)
+ throws IOException {
+ this.file = file;
+ this.in = openFile(fs, file, bufferSize, length);
+ this.conf = conf;
+ seek(start);
+ this.end = in.getPos() + length;
+ init(tempReader);
+ }
+
+ /**
+ * Override this method to specialize the type of
+ * {@link FSDataInputStream} returned.
+ */
+ protected FSDataInputStream openFile(FileSystem fs, Path file,
+ int bufferSize, long length) throws IOException {
+ return fs.open(file, bufferSize);
+ }
+
+ /**
+ * Initialize the {@link Reader}
+ * @param tmpReader <code>true</code> if we are constructing a temporary
+ * reader {@link SequenceFile.Sorter.cloneFileAttributes},
+ * and hence do not initialize every component;
+ * <code>false</code> otherwise.
+ * @throws IOException
+ */
+ private void init(boolean tempReader) throws IOException {
+ byte[] versionBlock = new byte[VERSION.length];
+ in.readFully(versionBlock);
+
+ if ((versionBlock[0] != VERSION[0]) ||
+ (versionBlock[1] != VERSION[1]) ||
+ (versionBlock[2] != VERSION[2]))
+ throw new IOException(file + " not a SequenceFile");
+
+ // Set 'version'
+ version = versionBlock[3];
+ if (version > VERSION[3])
+ throw new VersionMismatchException(VERSION[3], version);
+
+ if (version < BLOCK_COMPRESS_VERSION) {
+ UTF8 className = new UTF8();
+
+ className.readFields(in);
+ keyClassName = className.toString(); // key class name
+
+ className.readFields(in);
+ valClassName = className.toString(); // val class name
+ } else {
+ keyClassName = Text.readString(in);
+ valClassName = Text.readString(in);
+ }
+
+ if (version > 2) { // if version > 2
+ this.decompress = in.readBoolean(); // is compressed?
+ } else {
+ decompress = false;
+ }
+
+ if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4
+ this.blockCompressed = in.readBoolean(); // is block-compressed?
+ } else {
+ blockCompressed = false;
+ }
+
+ // if version >= 5
+ // setup the compression codec
+ if (decompress) {
+ if (version >= CUSTOM_COMPRESS_VERSION) {
+ String codecClassname = Text.readString(in);
+ try {
+ Class<? extends CompressionCodec> codecClass
+ = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
+ this.codec =
+ (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IllegalArgumentException("Unknown codec: " +
+ codecClassname, cnfe);
+ }
+ } else {
+ codec = new DefaultCodec();
+ ((Configurable)codec).setConf(conf);
+ }
+ }
+
+ this.metadata = new Metadata();
+ if (version >= VERSION_WITH_METADATA) { // if version >= 6
+ this.metadata.readFields(in);
+ }
+
+ if (version > 1) { // if version > 1
+ in.readFully(sync); // read sync bytes
+ }
+
+ // Initialize... *not* if this we are constructing a temporary Reader
+ if (!tempReader) {
+ valBuffer = new DataInputBuffer();
+ if (decompress) {
+ valDecompressor = CodecPool.getDecompressor(codec);
+ valInFilter = codec.createInputStream(valBuffer, valDecompressor);
+ valIn = new DataInputStream(valInFilter);
+ } else {
+ valIn = valBuffer;
+ }
+
+ if (blockCompressed) {
+ keyLenBuffer = new DataInputBuffer();
+ keyBuffer = new DataInputBuffer();
+ valLenBuffer = new DataInputBuffer();
+
+ keyLenDecompressor = CodecPool.getDecompressor(codec);
+ keyLenInFilter = codec.createInputStream(keyLenBuffer,
+ keyLenDecompressor);
+ keyLenIn = new DataInputStream(keyLenInFilter);
+
+ keyDecompressor = CodecPool.getDecompressor(codec);
+ keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
+ keyIn = new DataInputStream(keyInFilter);
+
+ valLenDecompressor = CodecPool.getDecompressor(codec);
+ valLenInFilter = codec.createInputStream(valLenBuffer,
+ valLenDecompressor);
+ valLenIn = new DataInputStream(valLenInFilter);
+ }
+
+ SerializationFactory serializationFactory =
+ new SerializationFactory(conf);
+ this.keyDeserializer =
+ getDeserializer(serializationFactory, getKeyClass());
+ if (!blockCompressed) {
+ this.keyDeserializer.open(valBuffer);
+ } else {
+ this.keyDeserializer.open(keyIn);
+ }
+ this.valDeserializer =
+ getDeserializer(serializationFactory, getValueClass());
+ this.valDeserializer.open(valIn);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Deserializer getDeserializer(SerializationFactory sf, Class c) {
+ return sf.getDeserializer(c);
+ }
+
+ /** Close the file. */
+ public synchronized void close() throws IOException {
+ // Return the decompressors to the pool
+ CodecPool.returnDecompressor(keyLenDecompressor);
+ CodecPool.returnDecompressor(keyDecompressor);
+ CodecPool.returnDecompressor(valLenDecompressor);
+ CodecPool.returnDecompressor(valDecompressor);
+ keyLenDecompressor = keyDecompressor = null;
+ valLenDecompressor = valDecompressor = null;
+
+ if (keyDeserializer != null) {
+ keyDeserializer.close();
+ }
+ if (valDeserializer != null) {
+ valDeserializer.close();
+ }
+
+ // Close the input-stream
+ in.close();
+ }
+
+ /** Returns the name of the key class. */
+ public String getKeyClassName() {
+ return keyClassName;
+ }
+
+ /** Returns the class of keys in this file. */
+ public synchronized Class<?> getKeyClass() {
+ if (null == keyClass) {
+ try {
+ keyClass = WritableName.getClass(getKeyClassName(), conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return keyClass;
+ }
+
+ /** Returns the name of the value class. */
+ public String getValueClassName() {
+ return valClassName;
+ }
+
+ /** Returns the class of values in this file. */
+ public synchronized Class<?> getValueClass() {
+ if (null == valClass) {
+ try {
+ valClass = WritableName.getClass(getValueClassName(), conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return valClass;
+ }
+
+ /** Returns true if values are compressed. */
+ public boolean isCompressed() { return decompress; }
+
+ /** Returns true if records are block-compressed. */
+ public boolean isBlockCompressed() { return blockCompressed; }
+
+ /** Returns the compression codec of data in this file. */
+ public CompressionCodec getCompressionCodec() { return codec; }
+
+ /** Returns the metadata object of the file */
+ public Metadata getMetadata() {
+ return this.metadata;
+ }
+
+ /** Returns the configuration used for this file. */
+ Configuration getConf() { return conf; }
+
+ /** Read a compressed buffer */
+ private synchronized void readBuffer(DataInputBuffer buffer,
+ CompressionInputStream filter) throws IOException {
+ // Read data into a temporary buffer
+ DataOutputBuffer dataBuffer = new DataOutputBuffer();
+
+ try {
+ int dataBufferLength = WritableUtils.readVInt(in);
+ dataBuffer.write(in, dataBufferLength);
+
+ // Set up 'buffer' connected to the input-stream
+ buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
+ } finally {
+ dataBuffer.close();
+ }
+
+ // Reset the codec
+ filter.resetState();
+ }
+
+ /** Read the next 'compressed' block */
+ private synchronized void readBlock() throws IOException {
+ // Check if we need to throw away a whole block of
+ // 'values' due to 'lazy decompression'
+ if (lazyDecompress && !valuesDecompressed) {
+ in.seek(WritableUtils.readVInt(in)+in.getPos());
+ in.seek(WritableUtils.readVInt(in)+in.getPos());
+ }
+
+ // Reset internal states
+ noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
+ valuesDecompressed = false;
+
+ //Process sync
+ if (sync != null) {
+ in.readInt();
+ in.readFully(syncCheck); // read syncCheck
+ if (!Arrays.equals(sync, syncCheck)) // check it
+ throw new IOException("File is corrupt!");
+ }
+ syncSeen = true;
+
+ // Read number of records in this block
+ noBufferedRecords = WritableUtils.readVInt(in);
+
+ // Read key lengths and keys
+ readBuffer(keyLenBuffer, keyLenInFilter);
+ readBuffer(keyBuffer, keyInFilter);
+ noBufferedKeys = noBufferedRecords;
+
+ // Read value lengths and values
+ if (!lazyDecompress) {
+ readBuffer(valLenBuffer, valLenInFilter);
+ readBuffer(valBuffer, valInFilter);
+ noBufferedValues = noBufferedRecords;
+ valuesDecompressed = true;
+ }
+ }
+
+ /**
+ * Position valLenIn/valIn to the 'value'
+ * corresponding to the 'current' key
+ */
+ private synchronized void seekToCurrentValue() throws IOException {
+ if (!blockCompressed) {
+ if (decompress) {
+ valInFilter.resetState();
+ }
+ valBuffer.reset();
+ } else {
+ // Check if this is the first value in the 'block' to be read
+ if (lazyDecompress && !valuesDecompressed) {
+ // Read the value lengths and values
+ readBuffer(valLenBuffer, valLenInFilter);
+ readBuffer(valBuffer, valInFilter);
+ noBufferedValues = noBufferedRecords;
+ valuesDecompressed = true;
+ }
+
+ // Calculate the no. of bytes to skip
+ // Note: 'current' key has already been read!
+ int skipValBytes = 0;
+ int currentKey = noBufferedKeys + 1;
+ for (int i=noBufferedValues; i > currentKey; --i) {
+ skipValBytes += WritableUtils.readVInt(valLenIn);
+ --noBufferedValues;
+ }
+
+ // Skip to the 'val' corresponding to 'current' key
+ if (skipValBytes > 0) {
+ if (valIn.skipBytes(skipValBytes) != skipValBytes) {
+ throw new IOException("Failed to seek to " + currentKey +
+ "(th) value!");
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the 'value' corresponding to the last read 'key'.
+ * @param val : The 'value' to be read.
+ * @throws IOException
+ */
+ public synchronized void getCurrentValue(Writable val)
+ throws IOException {
+ if (val instanceof Configurable) {
+ ((Configurable) val).setConf(this.conf);
+ }
+
+ // Position stream to 'current' value
+ seekToCurrentValue();
+
+ if (!blockCompressed) {
+ val.readFields(valIn);
+
+ if (valIn.read() > 0) {
+ LOG.info("available bytes: " + valIn.available());
+ throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
+ + " bytes, should read " +
+ (valBuffer.getLength()-keyLength));
+ }
+ } else {
+ // Get the value
+ int valLength = WritableUtils.readVInt(valLenIn);
+ val.readFields(valIn);
+
+ // Read another compressed 'value'
+ --noBufferedValues;
+
+ // Sanity check
+ if (valLength < 0) {
+ LOG.debug(val + " is a zero-length value");
+ }
+ }
+
+ }
+
+ /**
+ * Get the 'value' corresponding to the last read 'key'.
+ * @param val : The 'value' to be read.
+ * @throws IOException
+ */
+ public synchronized Object getCurrentValue(Object val)
+ throws IOException {
+ if (val instanceof Configurable) {
+ ((Configurable) val).setConf(this.conf);
+ }
+
+ // Position stream to 'current' value
+ seekToCurrentValue();
+
+ if (!blockCompressed) {
+ val = deserializeValue(val);
+
+ if (valIn.read() > 0) {
+ LOG.info("available bytes: " + valIn.available());
+ throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
+ + " bytes, should read " +
+ (valBuffer.getLength()-keyLength));
+ }
+ } else {
+ // Get the value
+ int valLength = WritableUtils.readVInt(valLenIn);
+ val = deserializeValue(val);
+
+ // Read another compressed 'value'
+ --noBufferedValues;
+
+ // Sanity check
+ if (valLength < 0) {
+ LOG.debug(val + " is a zero-length value");
+ }
+ }
+ return val;
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private Object deserializeValue(Object val) throws IOException {
+ return valDeserializer.deserialize(val);
+ }
+
+ /** Read the next key in the file into <code>key</code>, skipping its
+ * value. True if another entry exists, and false at end of file. */
+ public synchronized boolean next(Writable key) throws IOException {
+ if (key.getClass() != getKeyClass())
+ throw new IOException("wrong key class: "+key.getClass().getName()
+ +" is not "+keyClass);
+
+ if (!blockCompressed) {
+ outBuf.reset();
+
+ keyLength = next(outBuf);
+ if (keyLength < 0)
+ return false;
+
+ valBuffer.reset(outBuf.getData(), outBuf.getLength());
+
+ key.readFields(valBuffer);
+ valBuffer.mark(0);
+ if (valBuffer.getPosition() != keyLength)
+ throw new IOException(key + " read " + valBuffer.getPosition()
+ + " bytes, should read " + keyLength);
+ } else {
+ //Reset syncSeen
+ syncSeen = false;
+
+ if (noBufferedKeys == 0) {
+ try {
+ readBlock();
+ } catch (EOFException eof) {
+ return false;
+ }
+ }
+
+ int keyLength = WritableUtils.readVInt(keyLenIn);
+
+ // Sanity check
+ if (keyLength < 0) {
+ return false;
+ }
+
+ //Read another compressed 'key'
+ key.readFields(keyIn);
+ --noBufferedKeys;
+ }
+
+ return true;
+ }
+
+ /** Read the next key/value pair in the file into <code>key</code> and
+ * <code>val</code>. Returns true if such a pair exists and false when at
+ * end of file */
+ public synchronized boolean next(Writable key, Writable val)
+ throws IOException {
+ if (val.getClass() != getValueClass())
+ throw new IOException("wrong value class: "+val+" is not "+valClass);
+
+ boolean more = next(key);
+
+ if (more) {
+ getCurrentValue(val);
+ }
+
+ return more;
+ }
+
+ /**
+ * Read and return the next record length, potentially skipping over
+ * a sync block.
+ * @return the length of the next record or -1 if there is no next record
+ * @throws IOException
+ */
+ private synchronized int readRecordLength() throws IOException {
+ if (in.getPos() >= end) {
+ return -1;
+ }
+ int length = in.readInt();
+ if (version > 1 && sync != null &&
+ length == SYNC_ESCAPE) { // process a sync entry
+ in.readFully(syncCheck); // read syncCheck
+ if (!Arrays.equals(sync, syncCheck)) // check it
+ throw new IOException("File is corrupt!");
+ syncSeen = true;
+ if (in.getPos() >= end) {
+ return -1;
+ }
+ length = in.readInt(); // re-read length
+ } else {
+ syncSeen = false;
+ }
+
+ return length;
+ }
+
+ /** Read the next key/value pair in the file into <code>buffer</code>.
+ * Returns the length of the key read, or -1 if at end of file. The length
+ * of the value may be computed by calling buffer.getLength() before and
+ * after calls to this method. */
+ /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
+ public synchronized int next(DataOutputBuffer buffer) throws IOException {
+ // Unsupported for block-compressed sequence files
+ if (blockCompressed) {
+ throw new IOException("Unsupported call for block-compressed" +
+ " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
+ }
+ try {
+ int length = readRecordLength();
+ if (length == -1) {
+ return -1;
+ }
+ int keyLength = in.readInt();
+ buffer.write(in, length);
+ return keyLength;
+ } catch (ChecksumException e) { // checksum failure
+ handleChecksumException(e);
+ return next(buffer);
+ }
+ }
+
+ public ValueBytes createValueBytes() {
+ ValueBytes val = null;
+ if (!decompress || blockCompressed) {
+ val = new UncompressedBytes();
+ } else {
+ val = new CompressedBytes(codec);
+ }
+ return val;
+ }
+
+ /**
+ * Read 'raw' records.
+ * @param key - The buffer into which the key is read
+ * @param val - The 'raw' value
+ * @return Returns the total record length or -1 for end of file
+ * @throws IOException
+ */
+ public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val)
+ throws IOException {
+ if (!blockCompressed) {
+ int length = readRecordLength();
+ if (length == -1) {
+ return -1;
+ }
+ int keyLength = in.readInt();
+ int valLength = length - keyLength;
+ key.write(in, keyLength);
+ if (decompress) {
+ CompressedBytes value = (CompressedBytes)val;
+ value.reset(in, valLength);
+ } else {
+ UncompressedBytes value = (UncompressedBytes)val;
+ value.reset(in, valLength);
+ }
+
+ return length;
+ } else {
+ //Reset syncSeen
+ syncSeen = false;
+
+ // Read 'key'
+ if (noBufferedKeys == 0) {
+ if (in.getPos() >= end)
+ return -1;
+
+ try {
+ readBlock();
+ } catch (EOFException eof) {
+ return -1;
+ }
+ }
+ int keyLength = WritableUtils.readVInt(keyLenIn);
+ if (keyLength < 0) {
+ throw new IOException("zero length key found!");
+ }
+ key.write(keyIn, keyLength);
+ --noBufferedKeys;
+
+ // Read raw 'value'
+ seekToCurrentValue();
+ int valLength = WritableUtils.readVInt(valLenIn);
+ UncompressedBytes rawValue = (UncompressedBytes)val;
+ rawValue.reset(valIn, valLength);
+ --noBufferedValues;
+
+ return (keyLength+valLength);
+ }
+
+ }
+
+ /**
+ * Read 'raw' keys.
+ * @param key - The buffer into which the key is read
+ * @return Returns the key length or -1 for end of file
+ * @throws IOException
+ */
+ public int nextRawKey(DataOutputBuffer key)
+ throws IOException {
+ if (!blockCompressed) {
+ recordLength = readRecordLength();
+ if (recordLength == -1) {
+ return -1;
+ }
+ keyLength = in.readInt();
+ key.write(in, keyLength);
+ return keyLength;
+ } else {
+ //Reset syncSeen
+ syncSeen = false;
+
+ // Read 'key'
+ if (noBufferedKeys == 0) {
+ if (in.getPos() >= end)
+ return -1;
+
+ try {
+ readBlock();
+ } catch (EOFException eof) {
+ return -1;
+ }
+ }
+ int keyLength = WritableUtils.readVInt(keyLenIn);
+ if (keyLength < 0) {
+ throw new IOException("zero length key found!");
+ }
+ key.write(keyIn, keyLength);
+ --noBufferedKeys;
+
+ return keyLength;
+ }
+
+ }
+
+ /** Read the next key in the file, skipping its
+ * value. Return null at end of file. */
+ public synchronized Object next(Object key) throws IOException {
+ if (key != null && key.getClass() != getKeyClass()) {
+ throw new IOException("wrong key class: "+key.getClass().getName()
+ +" is not "+keyClass);
+ }
+
+ if (!blockCompressed) {
+ outBuf.reset();
+
+ keyLength = next(outBuf);
+ if (keyLength < 0)
+ return null;
+
+ valBuffer.reset(outBuf.getData(), outBuf.getLength());
+
+ key = deserializeKey(key);
+ valBuffer.mark(0);
+ if (valBuffer.getPosition() != keyLength)
+ throw new IOException(key + " read " + valBuffer.getPosition()
+ + " bytes, should read " + keyLength);
+ } else {
+ //Reset syncSeen
+ syncSeen = false;
+
+ if (noBufferedKeys == 0) {
+ try {
+ readBlock();
+ } catch (EOFException eof) {
+ return null;
+ }
+ }
+
+ int keyLength = WritableUtils.readVInt(keyLenIn);
+
+ // Sanity check
+ if (keyLength < 0) {
+ return null;
+ }
+
+ //Read another compressed 'key'
+ key = deserializeKey(key);
+ --noBufferedKeys;
+ }
+
+ return key;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Object deserializeKey(Object key) throws IOException {
+ return keyDeserializer.deserialize(key);
+ }
+
+ /**
+ * Read 'raw' values.
+ * @param val - The 'raw' value
+ * @return Returns the value length
+ * @throws IOException
+ */
+ public synchronized int nextRawValue(ValueBytes val)
+ throws IOException {
+
+ // Position stream to current value
+ seekToCurrentValue();
+
+ if (!blockCompressed) {
+ int valLength = recordLength - keyLength;
+ if (decompress) {
+ CompressedBytes value = (CompressedBytes)val;
+ value.reset(in, valLength);
+ } else {
+ UncompressedBytes value = (UncompressedBytes)val;
+ value.reset(in, valLength);
+ }
+
+ return valLength;
+ } else {
+ int valLength = WritableUtils.readVInt(valLenIn);
+ UncompressedBytes rawValue = (UncompressedBytes)val;
+ rawValue.reset(valIn, valLength);
+ --noBufferedValues;
+ return valLength;
+ }
+
+ }
+
+ private void handleChecksumException(ChecksumException e)
+ throws IOException {
+ if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
+ LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
+ sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
+ } else {
+ throw e;
+ }
+ }
+
+ /** Set the current byte position in the input file.
+ *
+ * <p>The position passed must be a position returned by {@link
+ * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary
+ * position, use {@link SequenceFile.Reader#sync(long)}.
+ */
+ public synchronized void seek(long position) throws IOException {
+ in.seek(position);
+ if (blockCompressed) { // trigger block read
+ noBufferedKeys = 0;
+ valuesDecompressed = true;
+ }
+ }
+
+ /** Seek to the next sync mark past a given position.*/
+ public synchronized void sync(long position) throws IOException {
+ if (position+SYNC_SIZE >= end) {
+ seek(end);
+ return;
+ }
+
+ try {
+ seek(position+4); // skip escape
+ in.readFully(syncCheck);
+ int syncLen = sync.length;
+ for (int i = 0; in.getPos() < end; i++) {
+ int j = 0;
+ for (; j < syncLen; j++) {
+ if (sync[j] != syncCheck[(i+j)%syncLen])
+ break;
+ }
+ if (j == syncLen) {
+ in.seek(in.getPos() - SYNC_SIZE); // position before sync
+ return;
+ }
+ syncCheck[i%syncLen] = in.readByte();
+ }
+ } catch (ChecksumException e) { // checksum failure
+ handleChecksumException(e);
+ }
+ }
+
+ /** Returns true iff the previous call to next passed a sync mark.*/
+ public boolean syncSeen() { return syncSeen; }
+
+ /** Return the current byte position in the input file. */
+ public synchronized long getPosition() throws IOException {
+ return in.getPos();
+ }
+
+ /** Returns the name of the file. */
+ public String toString() {
+ return file.toString();
+ }
+
+ }
+
+ /** Sorts key/value pairs in a sequence-format file.
+ *
+ * <p>For best performance, applications should make sure that the {@link
+ * Writable#readFields(DataInput)} implementation of their keys is
+ * very efficient. In particular, it should avoid allocating memory.
+ */
+ public static class Sorter {
+
+ private RawComparator comparator;
+
+ private MergeSort mergeSort; //the implementation of merge sort
+
+ private Path[] inFiles; // when merging or sorting
+
+ private Path outFile;
+
+ private int memory; // bytes
+ private int factor; // merged per pass
+
+ private FileSystem fs = null;
+
+ private Class keyClass;
+ private Class valClass;
+
+ private Configuration conf;
+
+ private Progressable progressable = null;
+
+ /** Sort and merge files containing the named classes. */
+ public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
+ Class valClass, Configuration conf) {
+ this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
+ }
+
+ /** Sort and merge using an arbitrary {@link RawComparator}. */
+ public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
+ Class valClass, Configuration conf) {
+ this.fs = fs;
+ this.comparator = comparator;
+ this.keyClass = keyClass;
+ this.valClass = valClass;
+ this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
+ this.factor = conf.getInt("io.sort.factor", 100);
+ this.conf = conf;
+ }
+
+ /** Set the number of streams to merge at once.*/
+ public void setFactor(int factor) { this.factor = factor; }
+
+ /** Get the number of streams to merge at once.*/
+ public int getFactor() { return factor; }
+
+ /** Set the total amount of buffer memory, in bytes.*/
+ public void setMemory(int memory) { this.memory = memory; }
+
+ /** Get the total amount of buffer memory, in bytes.*/
+ public int getMemory() { return memory; }
+
+ /** Set the progressable object in order to report progress. */
+ public void setProgressable(Progressable progressable) {
+ this.progressable = progressable;
+ }
+
+ /**
+ * Perform a file sort from a set of input files into an output file.
+ * @param inFiles the files to be sorted
+ * @param outFile the sorted output file
+ * @param deleteInput should the input files be deleted as they are read?
+ */
+ public void sort(Path[] inFiles, Path outFile,
+ boolean deleteInput) throws IOException {
+ if (fs.exists(outFile)) {
+ throw new IOException("already exists: " + outFile);
+ }
+
+ this.inFiles = inFiles;
+ this.outFile = outFile;
+
+ int segments = sortPass(deleteInput);
+ if (segments > 1) {
+ mergePass(outFile.getParent());
+ }
+ }
+
+ /**
+ * Perform a file sort from a set of input files and return an iterator.
+ * @param inFiles the files to be sorted
+ * @param tempDir the directory where temp files are created during sort
+ * @param deleteInput should the input files be deleted as they are read?
+ * @return iterator the RawKeyValueIterator
+ */
+ public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir,
+ boolean deleteInput) throws IOException {
+ Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
+ if (fs.exists(outFile)) {
+ throw new IOException("already exists: " + outFile);
+ }
+ this.inFiles = inFiles;
+ //outFile will basically be used as prefix for temp files in the cases
+ //where sort outputs multiple sorted segments. For the single segment
+ //case, the outputFile itself will contain the sorted data for that
+ //segment
+ this.outFile = outFile;
+
+ int segments = sortPass(deleteInput);
+ if (segments > 1)
+ return merge(outFile.suffix(".0"), outFile.suffix(".0.index"),
+ tempDir);
+ else if (segments == 1)
+ return merge(new Path[]{outFile}, true, tempDir);
+ else return null;
+ }
+
+ /**
+ * The backwards compatible interface to sort.
+ * @param inFile the input file to sort
+ * @param outFile the sorted output file
+ */
+ public void sort(Path inFile, Path outFile) throws IOException {
+ sort(new Path[]{inFile}, outFile, false);
+ }
+
+ private int sortPass(boolean deleteInput) throws IOException {
+ LOG.debug("running sort pass");
+ SortPass sortPass = new SortPass(); // make the SortPass
+ sortPass.setProgressable(progressable);
+ mergeSort = new MergeSort(sortPass.new SeqFileComparator());
+ try {
+ return sortPass.run(deleteInput); // run it
+ } finally {
+ sortPass.close(); // close it
+ }
+ }
+
+ private class SortPass {
+ private int memoryLimit = memory/4;
+ private int recordLimit = 1000000;
+
+ private DataOutputBuffer rawKeys = new DataOutputBuffer();
+ private byte[] rawBuffer;
+
+ private int[] keyOffsets = new int[1024];
+ private int[] pointers = new int[keyOffsets.length];
+ private int[] pointersCopy = new int[keyOffsets.length];
+ private int[] keyLengths = new int[keyOffsets.length];
+ private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
+
+ private ArrayList segmentLengths = new ArrayList();
+
+ private Reader in = null;
+ private FSDataOutputStream out = null;
+ private FSDataOutputStream indexOut = null;
+ private Path outName;
+
+ private Progressable progressable = null;
+
+ public int run(boolean deleteInput) throws IOException {
+ int segments = 0;
+ int currentFile = 0;
+ boolean atEof = (currentFile >= inFiles.length);
+ boolean isCompressed = false;
+ boolean isBlockCompressed = false;
+ CompressionCodec codec = null;
+ segmentLengths.clear();
+ if (atEof) {
+ return 0;
+ }
+
+ // Initialize
+ in = new Reader(fs, inFiles[currentFile], conf);
+ isCompressed = in.isCompressed();
+ isBlockCompressed = in.isBlockCompressed();
+ codec = in.getCompressionCodec();
+
+ for (int i=0; i < rawValues.length; ++i) {
+ rawValues[i] = null;
+ }
+
+ while (!atEof) {
+ int count = 0;
+ int bytesProcessed = 0;
+ rawKeys.reset();
+ while (!atEof &&
+ bytesProcessed < memoryLimit && count < recordLimit) {
+
+ // Read a record into buffer
+ // Note: Attempt to re-use 'rawValue' as far as possible
+ int keyOffset = rawKeys.getLength();
+ ValueBytes rawValue =
+ (count == keyOffsets.length || rawValues[count] == null) ?
+ in.createValueBytes() :
+ rawValues[count];
+ int recordLength = in.nextRaw(rawKeys, rawValue);
+ if (recordLength == -1) {
+ in.close();
+ if (deleteInput) {
+ fs.delete(inFiles[currentFile], true);
+ }
+ currentFile += 1;
+ atEof = currentFile >= inFiles.length;
+ if (!atEof) {
+ in = new Reader(fs, inFiles[currentFile], conf);
+ } else {
+ in = null;
+ }
+ continue;
+ }
+
+ int keyLength = rawKeys.getLength() - keyOffset;
+
+ if (count == keyOffsets.length)
+ grow();
+
+ keyOffsets[count] = keyOffset; // update pointers
+ pointers[count] = count;
+ keyLengths[count] = keyLength;
+ rawValues[count] = rawValue;
+
+ bytesProcessed += recordLength;
+ count++;
+ }
+
+ // buffer is full -- sort & flush it
+ LOG.debug("flushing segment " + segments);
+ rawBuffer = rawKeys.getData();
+ sort(count);
+ // indicate we're making progress
+ if (progressable != null) {
+ progressable.progress();
+ }
+ flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec,
+ segments==0 && atEof);
+ segments++;
+ }
+ return segments;
+ }
+
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
+ if (out != null) {
+ out.close();
+ }
+ if (indexOut != null) {
+ indexOut.close();
+ }
+ }
+
+ private void grow() {
+ int newLength = keyOffsets.length * 3 / 2;
+ keyOffsets = grow(keyOffsets, newLength);
+ pointers = grow(pointers, newLength);
+ pointersCopy = new int[newLength];
+ keyLengths = grow(keyLengths, newLength);
+ rawValues = grow(rawValues, newLength);
+ }
+
+ private int[] grow(int[] old, int newLength) {
+ int[] result = new int[newLength];
+ System.arraycopy(old, 0, result, 0, old.length);
+ return result;
+ }
+
+ private ValueBytes[] grow(ValueBytes[] old, int newLength) {
+ ValueBytes[] result = new ValueBytes[newLength];
+ System.arraycopy(old, 0, result, 0, old.length);
+ for (int i=old.length; i < newLength; ++i) {
+ result[i] = null;
+ }
+ return result;
+ }
+
[... 722 lines stripped ...]