You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/27 23:16:43 UTC
[11/22] incubator-geode git commit: GEODE-1072: Removing HDFS related
code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java
deleted file mode 100644
index b13f499..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java
+++ /dev/null
@@ -1,3726 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io;
-
-import java.io.*;
-import java.util.*;
-import java.rmi.server.UID;
-import java.security.MessageDigest;
-import org.apache.commons.logging.*;
-import org.apache.hadoop.util.Options;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.Options.CreateOpts;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.NativeCodeLoader;
-import org.apache.hadoop.util.MergeSort;
-import org.apache.hadoop.util.PriorityQueue;
-import org.apache.hadoop.util.Time;
-// ** Pivotal Changes Begin
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.VersionMismatchException;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableName;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
-//** Pivotal Changes End
-
-/**
- * <code>SequenceFile</code>s are flat files consisting of binary key/value
- * pairs.
- *
- * <p><code>SequenceFile</code> provides {@link Writer}, {@link Reader} and
- * {@link Sorter} classes for writing, reading and sorting respectively.</p>
- *
- * There are three <code>SequenceFile</code> <code>Writer</code>s based on the
- * {@link CompressionType} used to compress key/value pairs:
- * <ol>
- * <li>
- * <code>Writer</code> : Uncompressed records.
- * </li>
- * <li>
- * <code>RecordCompressWriter</code> : Record-compressed files, only compress
- * values.
- * </li>
- * <li>
- * <code>BlockCompressWriter</code> : Block-compressed files, both keys &
- * values are collected in 'blocks'
- * separately and compressed. The size of
- * the 'block' is configurable.
- * </ol>
- *
- * <p>The actual compression algorithm used to compress key and/or values can be
- * specified by using the appropriate {@link CompressionCodec}.</p>
- *
- * <p>The recommended way is to use the static <tt>createWriter</tt> methods
- * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
- *
- * <p>The {@link Reader} acts as the bridge and can read any of the above
- * <code>SequenceFile</code> formats.</p>
- *
- * <h4 id="Formats">SequenceFile Formats</h4>
- *
- * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
- * depending on the <code>CompressionType</code> specified. All of them share a
- * <a href="#Header">common header</a> described below.
- *
- * <h5 id="Header">SequenceFile Header</h5>
- * <ul>
- * <li>
- * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual
- * version number (e.g. SEQ4 or SEQ6)
- * </li>
- * <li>
- * keyClassName -key class
- * </li>
- * <li>
- * valueClassName - value class
- * </li>
- * <li>
- * compression - A boolean which specifies if compression is turned on for
- * keys/values in this file.
- * </li>
- * <li>
- * blockCompression - A boolean which specifies if block-compression is
- * turned on for keys/values in this file.
- * </li>
- * <li>
- * compression codec - <code>CompressionCodec</code> class which is used for
- * compression of keys and/or values (if compression is
- * enabled).
- * </li>
- * <li>
- * metadata - {@link Metadata} for this file.
- * </li>
- * <li>
- * sync - A sync marker to denote end of the header.
- * </li>
- * </ul>
- *
- * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
- * <ul>
- * <li>
- * <a href="#Header">Header</a>
- * </li>
- * <li>
- * Record
- * <ul>
- * <li>Record length</li>
- * <li>Key length</li>
- * <li>Key</li>
- * <li>Value</li>
- * </ul>
- * </li>
- * <li>
- * A sync-marker every few <code>100</code> bytes or so.
- * </li>
- * </ul>
- *
- * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
- * <ul>
- * <li>
- * <a href="#Header">Header</a>
- * </li>
- * <li>
- * Record
- * <ul>
- * <li>Record length</li>
- * <li>Key length</li>
- * <li>Key</li>
- * <li><i>Compressed</i> Value</li>
- * </ul>
- * </li>
- * <li>
- * A sync-marker every few <code>100</code> bytes or so.
- * </li>
- * </ul>
- *
- * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
- * <ul>
- * <li>
- * <a href="#Header">Header</a>
- * </li>
- * <li>
- * Record <i>Block</i>
- * <ul>
- * <li>Uncompressed number of records in the block</li>
- * <li>Compressed key-lengths block-size</li>
- * <li>Compressed key-lengths block</li>
- * <li>Compressed keys block-size</li>
- * <li>Compressed keys block</li>
- * <li>Compressed value-lengths block-size</li>
- * <li>Compressed value-lengths block</li>
- * <li>Compressed values block-size</li>
- * <li>Compressed values block</li>
- * </ul>
- * </li>
- * <li>
- * A sync-marker every block.
- * </li>
- * </ul>
- *
- * <p>The compressed blocks of key lengths and value lengths consist of the
- * actual lengths of individual keys/values encoded in ZeroCompressedInteger
- * format.</p>
- *
- * @see CompressionCodec
- */
-@InterfaceAudience.Public
-@InterfaceStability.Stable
-public class SequenceFile {
- private static final Log LOG = LogFactory.getLog(SequenceFile.class);
-
- private SequenceFile() {} // no public ctor
-
- private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
- private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
- private static final byte VERSION_WITH_METADATA = (byte)6;
- private static byte[] VERSION = new byte[] {
- (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
- };
-
- private static final int SYNC_ESCAPE = -1; // "length" of sync entries
- private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
- private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
-
- /** The number of bytes between sync points.*/
- public static final int SYNC_INTERVAL = 100*SYNC_SIZE;
-
- /**
- * The compression type used to compress key/value pairs in the
- * {@link SequenceFile}.
- *
- * @see SequenceFile.Writer
- */
- public static enum CompressionType {
- /** Do not compress records. */
- NONE,
- /** Compress values only, each separately. */
- RECORD,
- /** Compress sequences of records together in blocks. */
- BLOCK
- }
-
- /**
- * Get the compression type for the reduce outputs
- * @param job the job config to look in
- * @return the kind of compression to use
- */
- static public CompressionType getDefaultCompressionType(Configuration job) {
- String name = job.get("io.seqfile.compression.type");
- return name == null ? CompressionType.RECORD :
- CompressionType.valueOf(name);
- }
-
- /**
- * Set the default compression type for sequence files.
- * @param job the configuration to modify
- * @param val the new compression type (none, block, record)
- */
- static public void setDefaultCompressionType(Configuration job,
- CompressionType val) {
- job.set("io.seqfile.compression.type", val.toString());
- }
-
- /**
- * Create a new Writer with the given options.
- * @param conf the configuration to use
- * @param opts the options to create the file with
- * @return a new Writer
- * @throws IOException
- */
- public static Writer createWriter(Configuration conf, Writer.Option... opts
- ) throws IOException {
- Writer.CompressionOption compressionOption =
- Options.getOption(Writer.CompressionOption.class, opts);
- CompressionType kind;
- if (compressionOption != null) {
- kind = compressionOption.getValue();
- } else {
- kind = getDefaultCompressionType(conf);
- opts = Options.prependOptions(opts, Writer.compression(kind));
- }
- switch (kind) {
- default:
- case NONE:
- return new Writer(conf, opts);
- case RECORD:
- return new RecordCompressWriter(conf, opts);
- case BLOCK:
- return new BlockCompressWriter(conf, opts);
- }
- }
-
- /**
- * Construct the preferred type of SequenceFile Writer.
- * @param fs The configured filesystem.
- * @param conf The configuration.
- * @param name The name of the file.
- * @param keyClass The 'key' type.
- * @param valClass The 'value' type.
- * @return Returns the handle to the constructed SequenceFile Writer.
- * @throws IOException
- * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
- * instead.
- */
- @Deprecated
- public static Writer
- createWriter(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass) throws IOException {
- return createWriter(conf, Writer.filesystem(fs),
- Writer.file(name), Writer.keyClass(keyClass),
- Writer.valueClass(valClass));
- }
-
- /**
- * Construct the preferred type of SequenceFile Writer.
- * @param fs The configured filesystem.
- * @param conf The configuration.
- * @param name The name of the file.
- * @param keyClass The 'key' type.
- * @param valClass The 'value' type.
- * @param compressionType The compression type.
- * @return Returns the handle to the constructed SequenceFile Writer.
- * @throws IOException
- * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
- * instead.
- */
- @Deprecated
- public static Writer
- createWriter(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass,
- CompressionType compressionType) throws IOException {
- return createWriter(conf, Writer.filesystem(fs),
- Writer.file(name), Writer.keyClass(keyClass),
- Writer.valueClass(valClass),
- Writer.compression(compressionType));
- }
-
- /**
- * Construct the preferred type of SequenceFile Writer.
- * @param fs The configured filesystem.
- * @param conf The configuration.
- * @param name The name of the file.
- * @param keyClass The 'key' type.
- * @param valClass The 'value' type.
- * @param compressionType The compression type.
- * @param progress The Progressable object to track progress.
- * @return Returns the handle to the constructed SequenceFile Writer.
- * @throws IOException
- * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
- * instead.
- */
- @Deprecated
- public static Writer
- createWriter(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass, CompressionType compressionType,
- Progressable progress) throws IOException {
- return createWriter(conf, Writer.file(name),
- Writer.filesystem(fs),
- Writer.keyClass(keyClass),
- Writer.valueClass(valClass),
- Writer.compression(compressionType),
- Writer.progressable(progress));
- }
-
- /**
- * Construct the preferred type of SequenceFile Writer.
- * @param fs The configured filesystem.
- * @param conf The configuration.
- * @param name The name of the file.
- * @param keyClass The 'key' type.
- * @param valClass The 'value' type.
- * @param compressionType The compression type.
- * @param codec The compression codec.
- * @return Returns the handle to the constructed SequenceFile Writer.
- * @throws IOException
- * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
- * instead.
- */
- @Deprecated
- public static Writer
- createWriter(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass, CompressionType compressionType,
- CompressionCodec codec) throws IOException {
- return createWriter(conf, Writer.file(name),
- Writer.filesystem(fs),
- Writer.keyClass(keyClass),
- Writer.valueClass(valClass),
- Writer.compression(compressionType, codec));
- }
-
- /**
- * Construct the preferred type of SequenceFile Writer.
- * @param fs The configured filesystem.
- * @param conf The configuration.
- * @param name The name of the file.
- * @param keyClass The 'key' type.
- * @param valClass The 'value' type.
- * @param compressionType The compression type.
- * @param codec The compression codec.
- * @param progress The Progressable object to track progress.
- * @param metadata The metadata of the file.
- * @return Returns the handle to the constructed SequenceFile Writer.
- * @throws IOException
- * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
- * instead.
- */
- @Deprecated
- public static Writer
- createWriter(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass,
- CompressionType compressionType, CompressionCodec codec,
- Progressable progress, Metadata metadata) throws IOException {
- return createWriter(conf, Writer.file(name),
- Writer.filesystem(fs),
- Writer.keyClass(keyClass),
- Writer.valueClass(valClass),
- Writer.compression(compressionType, codec),
- Writer.progressable(progress),
- Writer.metadata(metadata));
- }
-
- /**
- * Construct the preferred type of SequenceFile Writer.
- * @param fs The configured filesystem.
- * @param conf The configuration.
- * @param name The name of the file.
- * @param keyClass The 'key' type.
- * @param valClass The 'value' type.
- * @param bufferSize buffer size for the underlaying outputstream.
- * @param replication replication factor for the file.
- * @param blockSize block size for the file.
- * @param compressionType The compression type.
- * @param codec The compression codec.
- * @param progress The Progressable object to track progress.
- * @param metadata The metadata of the file.
- * @return Returns the handle to the constructed SequenceFile Writer.
- * @throws IOException
- * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
- * instead.
- */
- @Deprecated
- public static Writer
- createWriter(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass, int bufferSize,
- short replication, long blockSize,
- CompressionType compressionType, CompressionCodec codec,
- Progressable progress, Metadata metadata) throws IOException {
- return createWriter(conf, Writer.file(name),
- Writer.filesystem(fs),
- Writer.keyClass(keyClass),
- Writer.valueClass(valClass),
- Writer.bufferSize(bufferSize),
- Writer.replication(replication),
- Writer.blockSize(blockSize),
- Writer.compression(compressionType, codec),
- Writer.progressable(progress),
- Writer.metadata(metadata));
- }
-
- /**
- * Construct the preferred type of SequenceFile Writer.
- * @param fs The configured filesystem.
- * @param conf The configuration.
- * @param name The name of the file.
- * @param keyClass The 'key' type.
- * @param valClass The 'value' type.
- * @param bufferSize buffer size for the underlaying outputstream.
- * @param replication replication factor for the file.
- * @param blockSize block size for the file.
- * @param createParent create parent directory if non-existent
- * @param compressionType The compression type.
- * @param codec The compression codec.
- * @param metadata The metadata of the file.
- * @return Returns the handle to the constructed SequenceFile Writer.
- * @throws IOException
- */
- @Deprecated
- public static Writer
- createWriter(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass, int bufferSize,
- short replication, long blockSize, boolean createParent,
- CompressionType compressionType, CompressionCodec codec,
- Metadata metadata) throws IOException {
- return createWriter(FileContext.getFileContext(fs.getUri(), conf),
- conf, name, keyClass, valClass, compressionType, codec,
- metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
- CreateOpts.bufferSize(bufferSize),
- createParent ? CreateOpts.createParent()
- : CreateOpts.donotCreateParent(),
- CreateOpts.repFac(replication),
- CreateOpts.blockSize(blockSize)
- );
- }
-
- /**
- * Construct the preferred type of SequenceFile Writer.
- * @param fc The context for the specified file.
- * @param conf The configuration.
- * @param name The name of the file.
- * @param keyClass The 'key' type.
- * @param valClass The 'value' type.
- * @param compressionType The compression type.
- * @param codec The compression codec.
- * @param metadata The metadata of the file.
- * @param createFlag gives the semantics of create: overwrite, append etc.
- * @param opts file creation options; see {@link CreateOpts}.
- * @return Returns the handle to the constructed SequenceFile Writer.
- * @throws IOException
- */
- public static Writer
- createWriter(FileContext fc, Configuration conf, Path name,
- Class keyClass, Class valClass,
- CompressionType compressionType, CompressionCodec codec,
- Metadata metadata,
- final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
- throws IOException {
- return createWriter(conf, fc.create(name, createFlag, opts),
- keyClass, valClass, compressionType, codec, metadata).ownStream();
- }
-
- /**
- * Construct the preferred type of SequenceFile Writer.
- * @param fs The configured filesystem.
- * @param conf The configuration.
- * @param name The name of the file.
- * @param keyClass The 'key' type.
- * @param valClass The 'value' type.
- * @param compressionType The compression type.
- * @param codec The compression codec.
- * @param progress The Progressable object to track progress.
- * @return Returns the handle to the constructed SequenceFile Writer.
- * @throws IOException
- * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
- * instead.
- */
- @Deprecated
- public static Writer
- createWriter(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass,
- CompressionType compressionType, CompressionCodec codec,
- Progressable progress) throws IOException {
- return createWriter(conf, Writer.file(name),
- Writer.filesystem(fs),
- Writer.keyClass(keyClass),
- Writer.valueClass(valClass),
- Writer.compression(compressionType, codec),
- Writer.progressable(progress));
- }
-
- /**
- * Construct the preferred type of 'raw' SequenceFile Writer.
- * @param conf The configuration.
- * @param out The stream on top which the writer is to be constructed.
- * @param keyClass The 'key' type.
- * @param valClass The 'value' type.
- * @param compressionType The compression type.
- * @param codec The compression codec.
- * @param metadata The metadata of the file.
- * @return Returns the handle to the constructed SequenceFile Writer.
- * @throws IOException
- * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
- * instead.
- */
- @Deprecated
- public static Writer
- createWriter(Configuration conf, FSDataOutputStream out,
- Class keyClass, Class valClass,
- CompressionType compressionType,
- CompressionCodec codec, Metadata metadata) throws IOException {
- return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
- Writer.valueClass(valClass),
- Writer.compression(compressionType, codec),
- Writer.metadata(metadata));
- }
-
- /**
- * Construct the preferred type of 'raw' SequenceFile Writer.
- * @param conf The configuration.
- * @param out The stream on top which the writer is to be constructed.
- * @param keyClass The 'key' type.
- * @param valClass The 'value' type.
- * @param compressionType The compression type.
- * @param codec The compression codec.
- * @return Returns the handle to the constructed SequenceFile Writer.
- * @throws IOException
- * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
- * instead.
- */
- @Deprecated
- public static Writer
- createWriter(Configuration conf, FSDataOutputStream out,
- Class keyClass, Class valClass, CompressionType compressionType,
- CompressionCodec codec) throws IOException {
- return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
- Writer.valueClass(valClass),
- Writer.compression(compressionType, codec));
- }
-
-
- /** The interface to 'raw' values of SequenceFiles. */
- public static interface ValueBytes {
-
- /** Writes the uncompressed bytes to the outStream.
- * @param outStream : Stream to write uncompressed bytes into.
- * @throws IOException
- */
- public void writeUncompressedBytes(DataOutputStream outStream)
- throws IOException;
-
- /** Write compressed bytes to outStream.
- * Note: that it will NOT compress the bytes if they are not compressed.
- * @param outStream : Stream to write compressed bytes into.
- */
- public void writeCompressedBytes(DataOutputStream outStream)
- throws IllegalArgumentException, IOException;
-
- /**
- * Size of stored data.
- */
- public int getSize();
- }
-
- private static class UncompressedBytes implements ValueBytes {
- private int dataSize;
- private byte[] data;
-
- private UncompressedBytes() {
- data = null;
- dataSize = 0;
- }
-
- private void reset(DataInputStream in, int length) throws IOException {
- if (data == null) {
- data = new byte[length];
- } else if (length > data.length) {
- data = new byte[Math.max(length, data.length * 2)];
- }
- dataSize = -1;
- in.readFully(data, 0, length);
- dataSize = length;
- }
-
- @Override
- public int getSize() {
- return dataSize;
- }
-
- @Override
- public void writeUncompressedBytes(DataOutputStream outStream)
- throws IOException {
- outStream.write(data, 0, dataSize);
- }
-
- @Override
- public void writeCompressedBytes(DataOutputStream outStream)
- throws IllegalArgumentException, IOException {
- throw
- new IllegalArgumentException("UncompressedBytes cannot be compressed!");
- }
-
- } // UncompressedBytes
-
- private static class CompressedBytes implements ValueBytes {
- private int dataSize;
- private byte[] data;
- DataInputBuffer rawData = null;
- CompressionCodec codec = null;
- CompressionInputStream decompressedStream = null;
-
- private CompressedBytes(CompressionCodec codec) {
- data = null;
- dataSize = 0;
- this.codec = codec;
- }
-
- private void reset(DataInputStream in, int length) throws IOException {
- if (data == null) {
- data = new byte[length];
- } else if (length > data.length) {
- data = new byte[Math.max(length, data.length * 2)];
- }
- dataSize = -1;
- in.readFully(data, 0, length);
- dataSize = length;
- }
-
- @Override
- public int getSize() {
- return dataSize;
- }
-
- @Override
- public void writeUncompressedBytes(DataOutputStream outStream)
- throws IOException {
- if (decompressedStream == null) {
- rawData = new DataInputBuffer();
- decompressedStream = codec.createInputStream(rawData);
- } else {
- decompressedStream.resetState();
- }
- rawData.reset(data, 0, dataSize);
-
- byte[] buffer = new byte[8192];
- int bytesRead = 0;
- while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
- outStream.write(buffer, 0, bytesRead);
- }
- }
-
- @Override
- public void writeCompressedBytes(DataOutputStream outStream)
- throws IllegalArgumentException, IOException {
- outStream.write(data, 0, dataSize);
- }
-
- } // CompressedBytes
-
- /**
- * The class encapsulating with the metadata of a file.
- * The metadata of a file is a list of attribute name/value
- * pairs of Text type.
- *
- */
- public static class Metadata implements Writable {
-
- private TreeMap<Text, Text> theMetadata;
-
- public Metadata() {
- this(new TreeMap<Text, Text>());
- }
-
- public Metadata(TreeMap<Text, Text> arg) {
- if (arg == null) {
- this.theMetadata = new TreeMap<Text, Text>();
- } else {
- this.theMetadata = arg;
- }
- }
-
- public Text get(Text name) {
- return this.theMetadata.get(name);
- }
-
- public void set(Text name, Text value) {
- this.theMetadata.put(name, value);
- }
-
- public TreeMap<Text, Text> getMetadata() {
- return new TreeMap<Text, Text>(this.theMetadata);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(this.theMetadata.size());
- Iterator<Map.Entry<Text, Text>> iter =
- this.theMetadata.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<Text, Text> en = iter.next();
- en.getKey().write(out);
- en.getValue().write(out);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int sz = in.readInt();
- if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
- this.theMetadata = new TreeMap<Text, Text>();
- for (int i = 0; i < sz; i++) {
- Text key = new Text();
- Text val = new Text();
- key.readFields(in);
- val.readFields(in);
- this.theMetadata.put(key, val);
- }
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null) {
- return false;
- }
- if (other.getClass() != this.getClass()) {
- return false;
- } else {
- return equals((Metadata)other);
- }
- }
-
- public boolean equals(Metadata other) {
- if (other == null) return false;
- if (this.theMetadata.size() != other.theMetadata.size()) {
- return false;
- }
- Iterator<Map.Entry<Text, Text>> iter1 =
- this.theMetadata.entrySet().iterator();
- Iterator<Map.Entry<Text, Text>> iter2 =
- other.theMetadata.entrySet().iterator();
- while (iter1.hasNext() && iter2.hasNext()) {
- Map.Entry<Text, Text> en1 = iter1.next();
- Map.Entry<Text, Text> en2 = iter2.next();
- if (!en1.getKey().equals(en2.getKey())) {
- return false;
- }
- if (!en1.getValue().equals(en2.getValue())) {
- return false;
- }
- }
- if (iter1.hasNext() || iter2.hasNext()) {
- return false;
- }
- return true;
- }
-
- @Override
- public int hashCode() {
- assert false : "hashCode not designed";
- return 42; // any arbitrary constant will do
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("size: ").append(this.theMetadata.size()).append("\n");
- Iterator<Map.Entry<Text, Text>> iter =
- this.theMetadata.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<Text, Text> en = iter.next();
- sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
- sb.append("\n");
- }
- return sb.toString();
- }
- }
-
- /** Write key/value pairs to a sequence-format file. */
- public static class Writer implements java.io.Closeable, Syncable {
- private Configuration conf;
- FSDataOutputStream out;
- boolean ownOutputStream = true;
- DataOutputBuffer buffer = new DataOutputBuffer();
-
- Class keyClass;
- Class valClass;
-
- private final CompressionType compress;
- CompressionCodec codec = null;
- CompressionOutputStream deflateFilter = null;
- DataOutputStream deflateOut = null;
- Metadata metadata = null;
- Compressor compressor = null;
-
- protected Serializer keySerializer;
- protected Serializer uncompressedValSerializer;
- protected Serializer compressedValSerializer;
-
- // Insert a globally unique 16-byte value every few entries, so that one
- // can seek into the middle of a file and then synchronize with record
- // starts and ends by scanning for this value.
- long lastSyncPos; // position of last sync
- byte[] sync; // 16 random bytes
- {
- try {
- MessageDigest digester = MessageDigest.getInstance("MD5");
- long time = Time.now();
- digester.update((new UID()+"@"+time).getBytes());
- sync = digester.digest();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public static interface Option {}
-
- static class FileOption extends Options.PathOption
- implements Option {
- FileOption(Path path) {
- super(path);
- }
- }
-
- /**
- * @deprecated only used for backwards-compatibility in the createWriter methods
- * that take FileSystem.
- */
- @Deprecated
- private static class FileSystemOption implements Option {
- private final FileSystem value;
- protected FileSystemOption(FileSystem value) {
- this.value = value;
- }
- public FileSystem getValue() {
- return value;
- }
- }
-
- static class StreamOption extends Options.FSDataOutputStreamOption
- implements Option {
- StreamOption(FSDataOutputStream stream) {
- super(stream);
- }
- }
-
- static class BufferSizeOption extends Options.IntegerOption
- implements Option {
- BufferSizeOption(int value) {
- super(value);
- }
- }
-
- static class BlockSizeOption extends Options.LongOption implements Option {
- BlockSizeOption(long value) {
- super(value);
- }
- }
-
- static class ReplicationOption extends Options.IntegerOption
- implements Option {
- ReplicationOption(int value) {
- super(value);
- }
- }
-
- static class KeyClassOption extends Options.ClassOption implements Option {
- KeyClassOption(Class<?> value) {
- super(value);
- }
- }
-
- static class ValueClassOption extends Options.ClassOption
- implements Option {
- ValueClassOption(Class<?> value) {
- super(value);
- }
- }
-
- static class MetadataOption implements Option {
- private final Metadata value;
- MetadataOption(Metadata value) {
- this.value = value;
- }
- Metadata getValue() {
- return value;
- }
- }
-
- static class ProgressableOption extends Options.ProgressableOption
- implements Option {
- ProgressableOption(Progressable value) {
- super(value);
- }
- }
-
- private static class CompressionOption implements Option {
- private final CompressionType value;
- private final CompressionCodec codec;
- CompressionOption(CompressionType value) {
- this(value, null);
- }
- CompressionOption(CompressionType value, CompressionCodec codec) {
- this.value = value;
- this.codec = (CompressionType.NONE != value && null == codec)
- ? new DefaultCodec()
- : codec;
- }
- CompressionType getValue() {
- return value;
- }
- CompressionCodec getCodec() {
- return codec;
- }
- }
-
- public static Option file(Path value) {
- return new FileOption(value);
- }
-
- /**
- * @deprecated only used for backwards-compatibility in the createWriter methods
- * that take FileSystem.
- */
- @Deprecated
- private static Option filesystem(FileSystem fs) {
- return new SequenceFile.Writer.FileSystemOption(fs);
- }
-
- public static Option bufferSize(int value) {
- return new BufferSizeOption(value);
- }
-
- public static Option stream(FSDataOutputStream value) {
- return new StreamOption(value);
- }
-
- public static Option replication(short value) {
- return new ReplicationOption(value);
- }
-
- public static Option blockSize(long value) {
- return new BlockSizeOption(value);
- }
-
- public static Option progressable(Progressable value) {
- return new ProgressableOption(value);
- }
-
- public static Option keyClass(Class<?> value) {
- return new KeyClassOption(value);
- }
-
- public static Option valueClass(Class<?> value) {
- return new ValueClassOption(value);
- }
-
- public static Option metadata(Metadata value) {
- return new MetadataOption(value);
- }
-
- public static Option compression(CompressionType value) {
- return new CompressionOption(value);
- }
-
- public static Option compression(CompressionType value,
- CompressionCodec codec) {
- return new CompressionOption(value, codec);
- }
-
- /**
- * Construct a uncompressed writer from a set of options.
- * @param conf the configuration to use
- * @param opts the options used when creating the writer
- * @throws IOException if it fails
- */
- Writer(Configuration conf,
- Option... opts) throws IOException {
- BlockSizeOption blockSizeOption =
- Options.getOption(BlockSizeOption.class, opts);
- BufferSizeOption bufferSizeOption =
- Options.getOption(BufferSizeOption.class, opts);
- ReplicationOption replicationOption =
- Options.getOption(ReplicationOption.class, opts);
- ProgressableOption progressOption =
- Options.getOption(ProgressableOption.class, opts);
- FileOption fileOption = Options.getOption(FileOption.class, opts);
- FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
- StreamOption streamOption = Options.getOption(StreamOption.class, opts);
- KeyClassOption keyClassOption =
- Options.getOption(KeyClassOption.class, opts);
- ValueClassOption valueClassOption =
- Options.getOption(ValueClassOption.class, opts);
- MetadataOption metadataOption =
- Options.getOption(MetadataOption.class, opts);
- CompressionOption compressionTypeOption =
- Options.getOption(CompressionOption.class, opts);
- // check consistency of options
- if ((fileOption == null) == (streamOption == null)) {
- throw new IllegalArgumentException("file or stream must be specified");
- }
- if (fileOption == null && (blockSizeOption != null ||
- bufferSizeOption != null ||
- replicationOption != null ||
- progressOption != null)) {
- throw new IllegalArgumentException("file modifier options not " +
- "compatible with stream");
- }
-
- FSDataOutputStream out;
- boolean ownStream = fileOption != null;
- if (ownStream) {
- Path p = fileOption.getValue();
- FileSystem fs;
- if (fsOption != null) {
- fs = fsOption.getValue();
- } else {
- fs = p.getFileSystem(conf);
- }
- int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
- bufferSizeOption.getValue();
- short replication = replicationOption == null ?
- fs.getDefaultReplication(p) :
- (short) replicationOption.getValue();
- long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
- blockSizeOption.getValue();
- Progressable progress = progressOption == null ? null :
- progressOption.getValue();
- out = fs.create(p, true, bufferSize, replication, blockSize, progress);
- } else {
- out = streamOption.getValue();
- }
- Class<?> keyClass = keyClassOption == null ?
- Object.class : keyClassOption.getValue();
- Class<?> valueClass = valueClassOption == null ?
- Object.class : valueClassOption.getValue();
- Metadata metadata = metadataOption == null ?
- new Metadata() : metadataOption.getValue();
- this.compress = compressionTypeOption.getValue();
- final CompressionCodec codec = compressionTypeOption.getCodec();
- if (codec != null &&
- (codec instanceof GzipCodec) &&
- !NativeCodeLoader.isNativeCodeLoaded() &&
- !ZlibFactory.isNativeZlibLoaded(conf)) {
- throw new IllegalArgumentException("SequenceFile doesn't work with " +
- "GzipCodec without native-hadoop " +
- "code!");
- }
- init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
- }
-
- /** Create the named file.
- * @deprecated Use
- * {@link SequenceFile#createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
- * instead.
- */
- @Deprecated
- public Writer(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass) throws IOException {
- this.compress = CompressionType.NONE;
- init(conf, fs.create(name), true, keyClass, valClass, null,
- new Metadata());
- }
-
- /** Create the named file with write-progress reporter.
- * @deprecated Use
- * {@link SequenceFile#createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
- * instead.
- */
- @Deprecated
- public Writer(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass,
- Progressable progress, Metadata metadata) throws IOException {
- this.compress = CompressionType.NONE;
- init(conf, fs.create(name, progress), true, keyClass, valClass,
- null, metadata);
- }
-
- /** Create the named file with write-progress reporter.
- * @deprecated Use
- * {@link SequenceFile#createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
- * instead.
- */
- @Deprecated
- public Writer(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass,
- int bufferSize, short replication, long blockSize,
- Progressable progress, Metadata metadata) throws IOException {
- this.compress = CompressionType.NONE;
- init(conf,
- fs.create(name, true, bufferSize, replication, blockSize, progress),
- true, keyClass, valClass, null, metadata);
- }
-
- boolean isCompressed() { return compress != CompressionType.NONE; }
- boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
-
- Writer ownStream() { this.ownOutputStream = true; return this; }
-
- /** Write and flush the file header. */
- private void writeFileHeader()
- throws IOException {
- out.write(VERSION);
- Text.writeString(out, keyClass.getName());
- Text.writeString(out, valClass.getName());
-
- out.writeBoolean(this.isCompressed());
- out.writeBoolean(this.isBlockCompressed());
-
- if (this.isCompressed()) {
- Text.writeString(out, (codec.getClass()).getName());
- }
- this.metadata.write(out);
- out.write(sync); // write the sync bytes
- out.flush(); // flush header
- }
-
- /** Initialize. */
- @SuppressWarnings("unchecked")
- void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
- Class keyClass, Class valClass,
- CompressionCodec codec, Metadata metadata)
- throws IOException {
- this.conf = conf;
- this.out = out;
- this.ownOutputStream = ownStream;
- this.keyClass = keyClass;
- this.valClass = valClass;
- this.codec = codec;
- this.metadata = metadata;
- SerializationFactory serializationFactory = new SerializationFactory(conf);
- this.keySerializer = serializationFactory.getSerializer(keyClass);
- if (this.keySerializer == null) {
- throw new IOException(
- "Could not find a serializer for the Key class: '"
- + keyClass.getCanonicalName() + "'. "
- + "Please ensure that the configuration '" +
- CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
- + "properly configured, if you're using"
- + "custom serialization.");
- }
- this.keySerializer.open(buffer);
- this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
- if (this.uncompressedValSerializer == null) {
- throw new IOException(
- "Could not find a serializer for the Value class: '"
- + valClass.getCanonicalName() + "'. "
- + "Please ensure that the configuration '" +
- CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
- + "properly configured, if you're using"
- + "custom serialization.");
- }
- this.uncompressedValSerializer.open(buffer);
- if (this.codec != null) {
- ReflectionUtils.setConf(this.codec, this.conf);
- this.compressor = CodecPool.getCompressor(this.codec);
- this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
- this.deflateOut =
- new DataOutputStream(new BufferedOutputStream(deflateFilter));
- this.compressedValSerializer = serializationFactory.getSerializer(valClass);
- if (this.compressedValSerializer == null) {
- throw new IOException(
- "Could not find a serializer for the Value class: '"
- + valClass.getCanonicalName() + "'. "
- + "Please ensure that the configuration '" +
- CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
- + "properly configured, if you're using"
- + "custom serialization.");
- }
- this.compressedValSerializer.open(deflateOut);
- }
- writeFileHeader();
- }
-
- /** Returns the class of keys in this file. */
- public Class getKeyClass() { return keyClass; }
-
- /** Returns the class of values in this file. */
- public Class getValueClass() { return valClass; }
-
- /** Returns the compression codec of data in this file. */
- public CompressionCodec getCompressionCodec() { return codec; }
-
- /** create a sync point */
- public void sync() throws IOException {
- if (sync != null && lastSyncPos != out.getPos()) {
- out.writeInt(SYNC_ESCAPE); // mark the start of the sync
- out.write(sync); // write sync
- lastSyncPos = out.getPos(); // update lastSyncPos
- }
- }
-
- /**
- * flush all currently written data to the file system
- * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
- */
- @Deprecated
- public void syncFs() throws IOException {
- if (out != null) {
- out.sync(); // flush contents to file system
- }
- }
-
- @Override
- public void hsync() throws IOException {
- if (out != null) {
- out.hsync();
- }
- }
- // Pivotal changes begin
- public void hsyncWithSizeUpdate() throws IOException {
- if (out != null) {
- if (out instanceof HdfsDataOutputStream) {
- try {
- ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
- } catch (NoSuchMethodError e){
- // We are probably working with an older version of hadoop jars which does not have the
- // hsync function with SyncFlag. Use the hsync version that does not update the size.
- out.hsync();
- }
- }
- else {
- out.hsync();
- }
- }
- }
- // Pivotal changes end
- @Override
- public void hflush() throws IOException {
- if (out != null) {
- out.hflush();
- }
- }
-
- /** Returns the configuration of this file. */
- Configuration getConf() { return conf; }
-
- /** Close the file. */
- @Override
- public synchronized void close() throws IOException {
- keySerializer.close();
- uncompressedValSerializer.close();
- if (compressedValSerializer != null) {
- compressedValSerializer.close();
- }
-
- CodecPool.returnCompressor(compressor);
- compressor = null;
-
- if (out != null) {
-
- // Close the underlying stream iff we own it...
- if (ownOutputStream) {
- out.close();
- } else {
- out.flush();
- }
- out = null;
- }
- }
-
- synchronized void checkAndWriteSync() throws IOException {
- if (sync != null &&
- out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
- sync();
- }
- }
-
- /** Append a key/value pair. */
- public void append(Writable key, Writable val)
- throws IOException {
- append((Object) key, (Object) val);
- }
-
- /** Append a key/value pair. */
- @SuppressWarnings("unchecked")
- public synchronized void append(Object key, Object val)
- throws IOException {
- if (key.getClass() != keyClass)
- throw new IOException("wrong key class: "+key.getClass().getName()
- +" is not "+keyClass);
- if (val.getClass() != valClass)
- throw new IOException("wrong value class: "+val.getClass().getName()
- +" is not "+valClass);
-
- buffer.reset();
-
- // Append the 'key'
- keySerializer.serialize(key);
- int keyLength = buffer.getLength();
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + key);
-
- // Append the 'value'
- if (compress == CompressionType.RECORD) {
- deflateFilter.resetState();
- compressedValSerializer.serialize(val);
- deflateOut.flush();
- deflateFilter.finish();
- } else {
- uncompressedValSerializer.serialize(val);
- }
-
- // Write the record out
- checkAndWriteSync(); // sync
- out.writeInt(buffer.getLength()); // total record length
- out.writeInt(keyLength); // key portion length
- out.write(buffer.getData(), 0, buffer.getLength()); // data
- }
-
- public synchronized void appendRaw(byte[] keyData, int keyOffset,
- int keyLength, ValueBytes val) throws IOException {
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + keyLength);
-
- int valLength = val.getSize();
-
- checkAndWriteSync();
-
- out.writeInt(keyLength+valLength); // total record length
- out.writeInt(keyLength); // key portion length
- out.write(keyData, keyOffset, keyLength); // key
- val.writeUncompressedBytes(out); // value
- }
-
- /** Returns the current length of the output file.
- *
- * <p>This always returns a synchronized position. In other words,
- * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
- * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However
- * the key may be earlier in the file than key last written when this
- * method was called (e.g., with block-compression, it may be the first key
- * in the block that was being written when this method was called).
- */
- public synchronized long getLength() throws IOException {
- return out.getPos();
- }
-
- } // class Writer
-
- /** Write key/compressed-value pairs to a sequence-format file. */
- static class RecordCompressWriter extends Writer {
-
- RecordCompressWriter(Configuration conf,
- Option... options) throws IOException {
- super(conf, options);
- }
-
- /** Append a key/value pair. */
- @Override
- @SuppressWarnings("unchecked")
- public synchronized void append(Object key, Object val)
- throws IOException {
- if (key.getClass() != keyClass)
- throw new IOException("wrong key class: "+key.getClass().getName()
- +" is not "+keyClass);
- if (val.getClass() != valClass)
- throw new IOException("wrong value class: "+val.getClass().getName()
- +" is not "+valClass);
-
- buffer.reset();
-
- // Append the 'key'
- keySerializer.serialize(key);
- int keyLength = buffer.getLength();
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + key);
-
- // Compress 'value' and append it
- deflateFilter.resetState();
- compressedValSerializer.serialize(val);
- deflateOut.flush();
- deflateFilter.finish();
-
- // Write the record out
- checkAndWriteSync(); // sync
- out.writeInt(buffer.getLength()); // total record length
- out.writeInt(keyLength); // key portion length
- out.write(buffer.getData(), 0, buffer.getLength()); // data
- }
-
- /** Append a key/value pair. */
- @Override
- public synchronized void appendRaw(byte[] keyData, int keyOffset,
- int keyLength, ValueBytes val) throws IOException {
-
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + keyLength);
-
- int valLength = val.getSize();
-
- checkAndWriteSync(); // sync
- out.writeInt(keyLength+valLength); // total record length
- out.writeInt(keyLength); // key portion length
- out.write(keyData, keyOffset, keyLength); // 'key' data
- val.writeCompressedBytes(out); // 'value' data
- }
-
- } // RecordCompressionWriter
-
- /** Write compressed key/value blocks to a sequence-format file. */
- static class BlockCompressWriter extends Writer {
-
- private int noBufferedRecords = 0;
-
- private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
- private DataOutputBuffer keyBuffer = new DataOutputBuffer();
-
- private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
- private DataOutputBuffer valBuffer = new DataOutputBuffer();
-
- private final int compressionBlockSize;
-
- BlockCompressWriter(Configuration conf,
- Option... options) throws IOException {
- super(conf, options);
- compressionBlockSize =
- conf.getInt("io.seqfile.compress.blocksize", 1000000);
- keySerializer.close();
- keySerializer.open(keyBuffer);
- uncompressedValSerializer.close();
- uncompressedValSerializer.open(valBuffer);
- }
-
- /** Workhorse to check and write out compressed data/lengths */
- private synchronized
- void writeBuffer(DataOutputBuffer uncompressedDataBuffer)
- throws IOException {
- deflateFilter.resetState();
- buffer.reset();
- deflateOut.write(uncompressedDataBuffer.getData(), 0,
- uncompressedDataBuffer.getLength());
- deflateOut.flush();
- deflateFilter.finish();
-
- WritableUtils.writeVInt(out, buffer.getLength());
- out.write(buffer.getData(), 0, buffer.getLength());
- }
-
- /** Compress and flush contents to dfs */
- @Override
- public synchronized void sync() throws IOException {
- if (noBufferedRecords > 0) {
- super.sync();
-
- // No. of records
- WritableUtils.writeVInt(out, noBufferedRecords);
-
- // Write 'keys' and lengths
- writeBuffer(keyLenBuffer);
- writeBuffer(keyBuffer);
-
- // Write 'values' and lengths
- writeBuffer(valLenBuffer);
- writeBuffer(valBuffer);
-
- // Flush the file-stream
- out.flush();
-
- // Reset internal states
- keyLenBuffer.reset();
- keyBuffer.reset();
- valLenBuffer.reset();
- valBuffer.reset();
- noBufferedRecords = 0;
- }
-
- }
-
- /** Close the file. */
- @Override
- public synchronized void close() throws IOException {
- if (out != null) {
- sync();
- }
- super.close();
- }
-
- /** Append a key/value pair. */
- @Override
- @SuppressWarnings("unchecked")
- public synchronized void append(Object key, Object val)
- throws IOException {
- if (key.getClass() != keyClass)
- throw new IOException("wrong key class: "+key+" is not "+keyClass);
- if (val.getClass() != valClass)
- throw new IOException("wrong value class: "+val+" is not "+valClass);
-
- // Save key/value into respective buffers
- int oldKeyLength = keyBuffer.getLength();
- keySerializer.serialize(key);
- int keyLength = keyBuffer.getLength() - oldKeyLength;
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + key);
- WritableUtils.writeVInt(keyLenBuffer, keyLength);
-
- int oldValLength = valBuffer.getLength();
- uncompressedValSerializer.serialize(val);
- int valLength = valBuffer.getLength() - oldValLength;
- WritableUtils.writeVInt(valLenBuffer, valLength);
-
- // Added another key/value pair
- ++noBufferedRecords;
-
- // Compress and flush?
- int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
- if (currentBlockSize >= compressionBlockSize) {
- sync();
- }
- }
-
- /** Append a key/value pair. */
- @Override
- public synchronized void appendRaw(byte[] keyData, int keyOffset,
- int keyLength, ValueBytes val) throws IOException {
-
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed");
-
- int valLength = val.getSize();
-
- // Save key/value data in relevant buffers
- WritableUtils.writeVInt(keyLenBuffer, keyLength);
- keyBuffer.write(keyData, keyOffset, keyLength);
- WritableUtils.writeVInt(valLenBuffer, valLength);
- val.writeUncompressedBytes(valBuffer);
-
- // Added another key/value pair
- ++noBufferedRecords;
-
- // Compress and flush?
- int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
- if (currentBlockSize >= compressionBlockSize) {
- sync();
- }
- }
-
- } // BlockCompressionWriter
-
- /** Get the configured buffer size */
- private static int getBufferSize(Configuration conf) {
- return conf.getInt("io.file.buffer.size", 4096);
- }
-
- /** Reads key/value pairs from a sequence-format file. */
- public static class Reader implements java.io.Closeable {
- private String filename;
- private FSDataInputStream in;
- private DataOutputBuffer outBuf = new DataOutputBuffer();
-
- private byte version;
-
- private String keyClassName;
- private String valClassName;
- private Class keyClass;
- private Class valClass;
-
- private CompressionCodec codec = null;
- private Metadata metadata = null;
-
- private byte[] sync = new byte[SYNC_HASH_SIZE];
- private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
- private boolean syncSeen;
-
- private long headerEnd;
- private long end;
- private int keyLength;
- private int recordLength;
-
- private boolean decompress;
- private boolean blockCompressed;
-
- private Configuration conf;
-
- private int noBufferedRecords = 0;
- private boolean lazyDecompress = true;
- private boolean valuesDecompressed = true;
-
- private int noBufferedKeys = 0;
- private int noBufferedValues = 0;
-
- private DataInputBuffer keyLenBuffer = null;
- private CompressionInputStream keyLenInFilter = null;
- private DataInputStream keyLenIn = null;
- private Decompressor keyLenDecompressor = null;
- private DataInputBuffer keyBuffer = null;
- private CompressionInputStream keyInFilter = null;
- private DataInputStream keyIn = null;
- private Decompressor keyDecompressor = null;
-
- private DataInputBuffer valLenBuffer = null;
- private CompressionInputStream valLenInFilter = null;
- private DataInputStream valLenIn = null;
- private Decompressor valLenDecompressor = null;
- private DataInputBuffer valBuffer = null;
- private CompressionInputStream valInFilter = null;
- private DataInputStream valIn = null;
- private Decompressor valDecompressor = null;
-
- private Deserializer keyDeserializer;
- private Deserializer valDeserializer;
-
- /**
- * A tag interface for all of the Reader options
- */
- public static interface Option {}
-
- /**
- * Create an option to specify the path name of the sequence file.
- * @param value the path to read
- * @return a new option
- */
- public static Option file(Path value) {
- return new FileOption(value);
- }
-
- /**
- * Create an option to specify the stream with the sequence file.
- * @param value the stream to read.
- * @return a new option
- */
- public static Option stream(FSDataInputStream value) {
- return new InputStreamOption(value);
- }
-
- /**
- * Create an option to specify the starting byte to read.
- * @param value the number of bytes to skip over
- * @return a new option
- */
- public static Option start(long value) {
- return new StartOption(value);
- }
-
- /**
- * Create an option to specify the number of bytes to read.
- * @param value the number of bytes to read
- * @return a new option
- */
- public static Option length(long value) {
- return new LengthOption(value);
- }
-
- /**
- * Create an option with the buffer size for reading the given pathname.
- * @param value the number of bytes to buffer
- * @return a new option
- */
- public static Option bufferSize(int value) {
- return new BufferSizeOption(value);
- }
-
- private static class FileOption extends Options.PathOption
- implements Option {
- private FileOption(Path value) {
- super(value);
- }
- }
-
- private static class InputStreamOption
- extends Options.FSDataInputStreamOption
- implements Option {
- private InputStreamOption(FSDataInputStream value) {
- super(value);
- }
- }
-
- private static class StartOption extends Options.LongOption
- implements Option {
- private StartOption(long value) {
- super(value);
- }
- }
-
- private static class LengthOption extends Options.LongOption
- implements Option {
- private LengthOption(long value) {
- super(value);
- }
- }
-
- private static class BufferSizeOption extends Options.IntegerOption
- implements Option {
- private BufferSizeOption(int value) {
- super(value);
- }
- }
-
- // only used directly
- private static class OnlyHeaderOption extends Options.BooleanOption
- implements Option {
- private OnlyHeaderOption() {
- super(true);
- }
- }
-
- public Reader(Configuration conf, Option... opts) throws IOException {
- // Look up the options, these are null if not set
- FileOption fileOpt = Options.getOption(FileOption.class, opts);
- InputStreamOption streamOpt =
- Options.getOption(InputStreamOption.class, opts);
- StartOption startOpt = Options.getOption(StartOption.class, opts);
- LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
- BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
- OnlyHeaderOption headerOnly =
- Options.getOption(OnlyHeaderOption.class, opts);
- // check for consistency
- if ((fileOpt == null) == (streamOpt == null)) {
- throw new
- IllegalArgumentException("File or stream option must be specified");
- }
- if (fileOpt == null && bufOpt != null) {
- throw new IllegalArgumentException("buffer size can only be set when" +
- " a file is specified.");
- }
- // figure out the real values
- Path filename = null;
- FSDataInputStream file;
- final long len;
- if (fileOpt != null) {
- filename = fileOpt.getValue();
- FileSystem fs = filename.getFileSystem(conf);
- int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
- len = null == lenOpt
- ? fs.getFileStatus(filename).getLen()
- : lenOpt.getValue();
- file = openFile(fs, filename, bufSize, len);
- } else {
- len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
- file = streamOpt.getValue();
- }
- long start = startOpt == null ? 0 : startOpt.getValue();
- // really set up
- initialize(filename, file, start, len, conf, headerOnly != null);
- }
-
- /**
- * Construct a reader by opening a file from the given file system.
- * @param fs The file system used to open the file.
- * @param file The file being read.
- * @param conf Configuration
- * @throws IOException
- * @deprecated Use Reader(Configuration, Option...) instead.
- */
- @Deprecated
- public Reader(FileSystem fs, Path file,
- Configuration conf) throws IOException {
- this(conf, file(file.makeQualified(fs)));
- }
-
- /**
- * Construct a reader by the given input stream.
- * @param in An input stream.
- * @param buffersize unused
- * @param start The starting position.
- * @param length The length being read.
- * @param conf Configuration
- * @throws IOException
- * @deprecated Use Reader(Configuration, Reader.Option...) instead.
- */
- @Deprecated
- public Reader(FSDataInputStream in, int buffersize,
- long start, long length, Configuration conf) throws IOException {
- this(conf, stream(in), start(start), length(length));
- }
-
- /** Common work of the constructors. */
- private void initialize(Path filename, FSDataInputStream in,
- long start, long length, Configuration conf,
- boolean tempReader) throws IOException {
- if (in == null) {
- throw new IllegalArgumentException("in == null");
- }
- this.filename = filename == null ? "<unknown>" : filename.toString();
- this.in = in;
- this.conf = conf;
- boolean succeeded = false;
- try {
- seek(start);
- this.end = this.in.getPos() + length;
- // if it wrapped around, use the max
- if (end < length) {
- end = Long.MAX_VALUE;
- }
- init(tempReader);
- succeeded = true;
- } finally {
- if (!succeeded) {
- IOUtils.cleanup(LOG, this.in);
- }
- }
- }
-
- /**
- * Override this method to specialize the type of
- * {@link FSDataInputStream} returned.
- * @param fs The file system used to open the file.
- * @param file The file being read.
- * @param bufferSize The buffer size used to read the file.
- * @param length The length being read if it is >= 0. Otherwise,
- * the length is not available.
- * @return The opened stream.
- * @throws IOException
- */
- protected FSDataInputStream openFile(FileSystem fs, Path file,
- int bufferSize, long length) throws IOException {
- return fs.open(file, bufferSize);
- }
-
- /**
- * Initialize the {@link Reader}
- * @param tempReader <code>true</code> if we are constructing a temporary
- * reader {@link SequenceFile.Sorter#cloneFileAttributes},
- * and hence do not initialize every component;
- * <code>false</code> otherwise.
- * @throws IOException
- */
- private void init(boolean tempReader) throws IOException {
- byte[] versionBlock = new byte[VERSION.length];
- in.readFully(versionBlock);
-
- if ((versionBlock[0] != VERSION[0]) ||
- (versionBlock[1] != VERSION[1]) ||
- (versionBlock[2] != VERSION[2]))
- throw new IOException(this + " not a SequenceFile");
-
- // Set 'version'
- version = versionBlock[3];
- if (version > VERSION[3])
- throw new VersionMismatchException(VERSION[3], version);
-
- if (version < BLOCK_COMPRESS_VERSION) {
- UTF8 className = new UTF8();
-
- className.readFields(in);
- keyClassName = className.toString(); // key class name
-
- className.readFields(in);
- valClassName = className.toString(); // val class name
- } else {
- keyClassName = Text.readString(in);
- valClassName = Text.readString(in);
- }
-
- if (version > 2) { // if version > 2
- this.decompress = in.readBoolean(); // is compressed?
- } else {
- decompress = false;
- }
-
- if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4
- this.blockCompressed = in.readBoolean(); // is block-compressed?
- } else {
- blockCompressed = false;
- }
-
- // if version >= 5
- // setup the compression codec
- if (decompress) {
- if (version >= CUSTOM_COMPRESS_VERSION) {
- String codecClassname = Text.readString(in);
- try {
- Class<? extends CompressionCodec> codecClass
- = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
- this.codec = ReflectionUtils.newInstance(codecClass, conf);
- } catch (ClassNotFoundException cnfe) {
- throw new IllegalArgumentException("Unknown codec: " +
- codecClassname, cnfe);
- }
- } else {
- codec = new DefaultCodec();
- ((Configurable)codec).setConf(conf);
- }
- }
-
- this.metadata = new Metadata();
- if (version >= VERSION_WITH_METADATA) { // if version >= 6
- this.metadata.readFields(in);
- }
-
- if (version > 1) { // if version > 1
- in.readFully(sync); // read sync bytes
- headerEnd = in.getPos(); // record end of header
- }
-
- // Initialize... *not* if this we are constructing a temporary Reader
- if (!tempReader) {
- valBuffer = new DataInputBuffer();
- if (decompress) {
- valDecompressor = CodecPool.getDecompressor(codec);
- valInFilter = codec.createInputStream(valBuffer, valDecompressor);
- valIn = new DataInputStream(valInFilter);
- } else {
- valIn = valBuffer;
- }
-
- if (blockCompressed) {
- keyLenBuffer = new DataInputBuffer();
- keyBuffer = new DataInputBuffer();
- valLenBuffer = new DataInputBuffer();
-
- keyLenDecompressor = CodecPool.getDecompressor(codec);
- keyLenInFilter = codec.createInputStream(keyLenBuffer,
- keyLenDecompressor);
- keyLenIn = new DataInputStream(keyLenInFilter);
-
- keyDecompressor = CodecPool.getDecompressor(codec);
- keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
- keyIn = new DataInputStream(keyInFilter);
-
- valLenDecompressor = CodecPool.getDecompressor(codec);
- valLenInFilter = codec.createInputStream(valLenBuffer,
- valLenDecompressor);
- valLenIn = new DataInputStream(valLenInFilter);
- }
-
- SerializationFactory serializationFactory =
- new SerializationFactory(conf);
- this.keyDeserializer =
- getDeserializer(serializationFactory, getKeyClass());
- if (this.keyDeserializer == null) {
- throw new IOException(
- "Could not find a deserializer for the Key class: '"
- + getKeyClass().getCanonicalName() + "'. "
- + "Please ensure that the configuration '" +
- CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
- + "properly configured, if you're using "
- + "custom serialization.");
- }
- if (!blockCompressed) {
- this.keyDeserializer.open(valBuffer);
- } else {
- this.keyDeserializer.open(keyIn);
- }
- this.valDeserializer =
- getDeserializer(serializationFactory, getValueClass());
- if (this.valDeserializer == null) {
- throw new IOException(
- "Could not find a deserializer for the Value class: '"
- + getValueClass().getCanonicalName() + "'. "
- + "Please ensure that the configuration '" +
- CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
- + "properly configured, if you're using "
- + "custom serialization.");
- }
- this.valDeserializer.open(valIn);
- }
- }
-
- @SuppressWarnings("unchecked")
- private Deserializer getDeserializer(SerializationFactory sf, Class c) {
- return sf.getDeserializer(c);
- }
-
- /** Close the file. */
- @Override
- public synchronized void close() throws IOException {
- // Return the decompressors to the pool
- CodecPool.returnDecompressor(keyLenDecompressor);
- CodecPool.returnDecompressor(keyDecompressor);
- CodecPool.returnDecompressor(valLenDecompressor);
- CodecPool.returnDecompressor(valDecompressor);
- keyLenDecompressor = keyDecompressor = null;
- valLenDecompressor = valDecompressor = null;
-
- if (keyDeserializer != null) {
- keyDeserializer.close();
- }
- if (valDeserializer != null) {
- valDeserializer.close();
- }
-
- // Close the input-stream
- in.close();
- }
-
- /** Returns the name of the key class. */
- public String getKeyClassName() {
- return keyClassName;
- }
-
- /** Returns the class of keys in this file. */
- public synchronized Class<?> getKeyClass() {
- if (null == keyClass) {
- try {
- keyClass = WritableName.getClass(getKeyClassName(), conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- return keyClass;
- }
-
- /** Returns the name of the value class. */
- public String getValueClassName() {
- return valClassName;
- }
-
- /** Returns the class of values in this file. */
- public synchronized Class<?> getValueClass() {
- if (null == valClass) {
- try {
- valClass = WritableName.getClass(getValueClassName(), conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- return valClass;
- }
-
- /** Returns true if values are compressed. */
- public boolean isCompressed() { return decompress; }
-
- /** Returns true if records are block-compressed. */
- public boolean isBlockCompressed() { return blockCompressed; }
-
- /** Returns the compression codec of data in this file. */
- public CompressionCodec getCompressionCodec() { return codec; }
-
- /**
- * Get the compression type for this file.
- * @return the compression type
- */
- public CompressionType getCompressionType() {
- if (decompress) {
- return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
- } else {
- return CompressionType.NONE;
- }
- }
-
- /** Returns the metadata object of the file */
- public Metadata getMetadata() {
- return this.metadata;
- }
-
- /** Returns the configuration used for this file. */
- Configuration getConf() { return conf; }
-
- /** Read a compressed buffer */
- private synchronized void readBuffer(DataInputBuffer buffer,
- CompressionInputStream filter) throws IOException {
- // Read data into a temporary buffer
- DataOutputBuffer dataBuffer = new DataOutputBuffer();
-
- try {
- int dataBufferLength = WritableUtils.readVInt(in);
- dataBuffer.write(in, dataBufferLength);
-
- // Set up 'buffer' connected to the input-stream
- buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
- } finally {
- dataBuffer.close();
- }
-
- // Reset the codec
- filter.resetState();
- }
-
- /** Read the next 'compressed' block */
- private synchronized void readBlock() throws IOException {
- // Check if we need to throw away a whole block of
- // 'values' due to 'lazy decompression'
- if (lazyDecompress && !valuesDecompressed) {
- in.seek(WritableUtils.readVInt(in)+in.getPos());
- in.seek(WritableUtils.readVInt(in)+in.getPos());
- }
-
- // Reset internal states
- noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
- valuesDecompressed = false;
-
- //Process sync
- if (sync != null) {
- in.readInt();
- in.readFully(syncCheck); // read syncCheck
- if (!Arrays.equals(sync, syncCheck)) // check it
- throw new IOException("File is corrupt!");
- }
- syncSeen = true;
-
- // Read number of records in this block
- noBufferedRecords = WritableUtils.readVInt(in);
-
- // Read key lengths and keys
- readBuffer(keyLenBuffer, keyLenInFilter);
- readBuffer(keyBuffer, keyInFilter);
- noBufferedKeys = noBufferedRecords;
-
- // Read value lengths and values
- if (!lazyDecompress) {
- readBuffer(valLenBuffer, valLenInFilter);
- readBuffer(valBuffer, valInFilter);
- noBufferedValues = noBufferedRecords;
- valuesDecompressed = true;
- }
- }
-
- /**
- * Position valLenIn/valIn to the 'value'
- * corresponding to the 'current' key
- */
- private synchronized void seekToCurrentValue() throws IOException {
- if (!blockCompressed) {
- if (decompress) {
- valInFilter.resetState();
- }
- valBuffer.reset();
- } else {
- // Check if this is the first value in the 'block' to be read
- if (lazyDecompress && !valuesDecompressed) {
- // Read the value lengths and values
- readBuffer(valLenBuffer, valLenInFilter);
- readBuffer(valBuffer, valInFilter);
- noBufferedValues = noBufferedRecords;
- valuesDecompressed = true;
- }
-
- // Calculate the no. of bytes to skip
- // Note: 'current' key has already been read!
- int skipValBytes = 0;
- int currentKey = noBufferedKeys + 1;
- for (int i=noBufferedValues; i > currentKey; --i) {
- skipValBytes += WritableUtils.readVInt(valLenIn);
- --noBufferedValues;
- }
-
- // Skip to the 'val' corresponding to 'current' key
- if (skipValBytes > 0) {
- if (valIn.skipBytes(skipValBytes) != skipValBytes) {
- throw new IOException("Failed to seek to " + currentKey +
- "(th) value!");
- }
- }
- }
- }
-
- /**
- * Get the 'value' corresponding to the last read 'key'.
- * @param val : The 'value' to be read.
- * @throws IOException
- */
- public synchronized void getCurrentValue(Writable val)
- throws IOException {
- if (val instanceof Configurable) {
- ((Configurable) val).setConf(this.conf);
- }
-
- // Position stream to 'current' value
- seekToCurrentValue();
-
- if (!blockCompressed) {
- val.readFields(valIn);
-
- if (valIn.read() > 0) {
- LOG.info("available bytes: " + valIn.available());
- throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
- + " bytes, should read " +
- (valBuffer.getLength()-keyLength));
- }
- } else {
- // Get the value
- int valLength = WritableUtils.readVInt(valLenIn);
- val.readFields(valIn);
-
- // Read another compressed 'value'
- --noBufferedValues;
-
- // Sanity check
- if ((valLength < 0) && LOG.isDebugEnabled()) {
- LOG.debug(val + " is a zero-length value");
- }
- }
-
- }
-
- /**
- * Get the 'value' corresponding to the last read 'key'.
- * @param val : The 'value' to be read.
- * @throws IOException
- */
- public synchronized Object getCurrentValue(Object val)
- throws IOException {
- if (val instanceof Configurable) {
- ((Configurable) val).setConf(this.conf);
- }
-
- // Position stream to 'current' value
- seekToCurrentValue();
-
- if (!blockCompressed) {
- val = deserializeValue(val);
-
- if (valIn.read() > 0) {
- LOG.info("available bytes: " + valIn.available());
- throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
- + " bytes, should read " +
- (valBuffer.getLength()-keyLength));
- }
- } else {
- // Get the value
- int valLength = WritableUtils.readVInt(valLenIn);
- val = deserializeValue(val);
-
- // Read another compressed 'value'
- --noBufferedValues;
-
- // Sanity check
- if ((valLength < 0) && LOG.isDebugEnabled()) {
- LOG.debug(val + " is a zero-length value");
- }
- }
- return val;
-
- }
-
- @SuppressWarnings("unchecked")
- private Object deserializeValue(Object val) throws IOException {
- return valDeserializer.deserialize(val);
- }
-
- /** Read the next key in the file into <code>key</code>, skipping its
- * value. True if another entry exists, and false at end of file. */
- public synchronized boolean next(Writable key) throws IOException {
- if (key.getClass() != getKeyClass())
- throw new IOException("wrong key class: "+key.getClass().getName()
- +" is not "+keyClass);
-
- if (!blockCompressed) {
- outBuf.reset();
-
- keyLength = next(outBuf);
- if (keyLength < 0)
- return false;
-
- valBuffer.reset(outBuf.getData(), outBuf.getLength());
-
- key.readFields(valBuffer);
- valBuffer.mark(0);
- if (valBuffer.getPosition() != keyLength)
- throw new IOException(key + " read " + valBuffer.getPosition()
- + " bytes, should read " + keyLength);
- } else {
- //Reset syncSeen
- syncSeen = false;
-
- if (noBufferedKeys == 0) {
- try {
- readBlock();
- } catch (EOFException eof) {
- return false;
- }
- }
-
- int keyLength = WritableUtils.readVInt(keyLenIn);
-
- // Sanity check
- if (keyLength < 0) {
- return false;
- }
-
- //Read another compressed 'key'
- key.readFields(keyIn);
- --noBufferedKeys;
- }
-
- return true;
- }
-
- /** Read the next key/value pair in the file into <code>key</code> and
- * <code>val</code>. Returns true if such a pair exists and false when at
- * end of file */
- public synchronized boolean next(Writable key, Writable val)
- throws IOException {
- if (val.getClass() != getValueClass())
- throw new IOException("wrong value class: "+val+" is not "+valClass);
-
- boolean more = next(key);
-
- if (more) {
- getCurrentValue(val);
- }
-
- return more;
- }
-
- /**
- * Read and return the next record length, potentially skipping over
- * a sync block.
- * @return the length of the next record or -1 if there is no next record
- * @throws IOException
- */
- private synchronized int readRecordLength() throws IOException {
- if (in.getPos() >= end) {
- return -1;
- }
- int length = in.readInt();
- if (version > 1 && sync != null &&
- length == SYNC_ESCAPE) { // process a sync entry
- in.readFully(syncCheck); // read syncCheck
- if (!Arrays.equals(sync, syncCheck)) // check it
- throw new IOException("File is corrupt!");
- syncSeen = true;
- if (in.getPos() >= end) {
- return -1;
- }
- length = in.readInt(); // re-read length
- } else {
- syncSeen = false;
- }
-
- return length;
- }
-
- /** Read the next key/value pair in the file into <code>buffer</code>.
- * Returns the length of the key read, or -1 if at end of file. The length
- * of the value may be computed by calling buffer.getLength() before and
- * after calls to this method. */
- /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
- @Deprecated
- synchronized int next(DataOutputBuffer buffer) throws IOException {
- // Unsupported for block-compressed sequence files
- if (blockCompressed) {
- throw new IOException("Unsupported call for block-compressed" +
- " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
- }
- try {
- int length = readRecordLength();
- if (length == -1) {
- return -1;
- }
- int keyLength = in.readInt();
- buffer.write(in, length);
- return keyLength;
- } catch (ChecksumException e) { // checksum failure
- handleChecksumException(e);
- return next(buffer);
- }
- }
-
- public ValueBytes createValueBytes() {
- ValueBytes val = null;
- if (!decompress || blockCompressed) {
- val = new UncompressedBytes();
- } else {
- val = new CompressedBytes(codec);
- }
- return val;
- }
-
- /**
- * Read 'raw' records.
- * @param key - The buffer into which the key is read
- * @param val - The 'raw' value
- * @return Returns the total record length or -1 for end of file
- * @throws IOException
- */
- public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val)
- throws IOException {
- if (!blockCompressed) {
- int length = readRecordLength();
- if (length == -1) {
- return -1;
- }
- int keyLength = in.readInt();
- int valLength = le
<TRUNCATED>