You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2011/12/23 18:53:13 UTC
svn commit: r1222766 [2/3] - in /incubator/accumulo/trunk:
src/core/src/main/java/org/apache/accumulo/core/
src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/
src/core/src/main/java/org/apache/accumulo/core/file/
src/core/src/main/java...
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MySequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MySequenceFile.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MySequenceFile.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MySequenceFile.java Fri Dec 23 17:53:12 2011
@@ -18,20 +18,14 @@
package org.apache.accumulo.core.file.map;
-import java.io.BufferedOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
-import java.rmi.server.UID;
-import java.security.MessageDigest;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -40,40 +34,23 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableName;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.util.MergeSort;
-import org.apache.hadoop.util.NativeCodeLoader;
-import org.apache.hadoop.util.PriorityQueue;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
/**
@@ -255,344 +232,6 @@ public class MySequenceFile {
job.set("io.seqfile.compression.type", val.toString());
}
- /**
- * Construct the preferred type of MySequenceFile Writer.
- *
- * @param fs
- * The configured filesystem.
- * @param conf
- * The configuration.
- * @param name
- * The name of the file.
- * @param keyClass
- * The 'key' type.
- * @param valClass
- * The 'value' type.
- * @return Returns the handle to the constructed MySequenceFile Writer.
- */
- public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass) throws IOException {
- return createWriter(fs, conf, name, keyClass, valClass, getCompressionType(conf));
- }
-
- /**
- * Construct the preferred type of MySequenceFile Writer.
- *
- * @param fs
- * The configured filesystem.
- * @param conf
- * The configuration.
- * @param name
- * The name of the file.
- * @param keyClass
- * The 'key' type.
- * @param valClass
- * The 'value' type.
- * @param compressionType
- * The compression type.
- * @return Returns the handle to the constructed MySequenceFile Writer.
- */
- public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType)
- throws IOException {
- return createWriter(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(),
- fs.getDefaultBlockSize(), compressionType, new DefaultCodec(), null, new Metadata());
- }
-
- /**
- * Construct the preferred type of MySequenceFile Writer.
- *
- * @param fs
- * The configured filesystem.
- * @param conf
- * The configuration.
- * @param name
- * The name of the file.
- * @param keyClass
- * The 'key' type.
- * @param valClass
- * The 'value' type.
- * @param compressionType
- * The compression type.
- * @param progress
- * The Progressable object to track progress.
- * @return Returns the handle to the constructed MySequenceFile Writer.
- */
- public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType,
- Progressable progress) throws IOException {
- return createWriter(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(),
- fs.getDefaultBlockSize(), compressionType, new DefaultCodec(), progress, new Metadata());
- }
-
- /**
- * Construct the preferred type of MySequenceFile Writer.
- *
- * @param fs
- * The configured filesystem.
- * @param conf
- * The configuration.
- * @param name
- * The name of the file.
- * @param keyClass
- * The 'key' type.
- * @param valClass
- * The 'value' type.
- * @param compressionType
- * The compression type.
- * @param codec
- * The compression codec.
- * @return Returns the handle to the constructed MySequenceFile Writer.
- */
- public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType,
- CompressionCodec codec) throws IOException {
- return createWriter(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(),
- fs.getDefaultBlockSize(), compressionType, codec, null, new Metadata());
- }
-
- /**
- * Construct the preferred type of MySequenceFile Writer.
- *
- * @param fs
- * The configured filesystem.
- * @param conf
- * The configuration.
- * @param name
- * The name of the file.
- * @param keyClass
- * The 'key' type.
- * @param valClass
- * The 'value' type.
- * @param compressionType
- * The compression type.
- * @param codec
- * The compression codec.
- * @param progress
- * The Progressable object to track progress.
- * @param metadata
- * The metadata of the file.
- * @return Returns the handle to the constructed MySequenceFile Writer.
- */
- public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType,
- CompressionCodec codec, Progressable progress, Metadata metadata) throws IOException {
- return createWriter(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(),
- fs.getDefaultBlockSize(), compressionType, codec, progress, metadata);
- }
-
- /**
- * Construct the preferred type of MySequenceFile Writer.
- *
- * @param fs
- * The configured filesystem.
- * @param conf
- * The configuration.
- * @param name
- * The name of the file.
- * @param keyClass
- * The 'key' type.
- * @param valClass
- * The 'value' type.
- * @param bufferSize
- * buffer size for the underlaying outputstream.
- * @param replication
- * replication factor for the file.
- * @param blockSize
- * block size for the file.
- * @param compressionType
- * The compression type.
- * @param codec
- * The compression codec.
- * @param progress
- * The Progressable object to track progress.
- * @param metadata
- * The metadata of the file.
- * @return Returns the handle to the constructed MySequenceFile Writer.
- */
- public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, int bufferSize, short replication,
- long blockSize, CompressionType compressionType, CompressionCodec codec, Progressable progress, Metadata metadata) throws IOException {
- if ((codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && !ZlibFactory.isNativeZlibLoaded(conf)) {
- throw new IllegalArgumentException("MySequenceFile doesn't work with " + "GzipCodec without native-hadoop code!");
- }
-
- Writer writer = null;
-
- if (compressionType == CompressionType.NONE) {
- writer = new Writer(fs, conf, name, keyClass, valClass, bufferSize, replication, blockSize, progress, metadata);
- } else if (compressionType == CompressionType.RECORD) {
- writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, bufferSize, replication, blockSize, codec, progress, metadata);
- } else if (compressionType == CompressionType.BLOCK) {
- writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass, bufferSize, replication, blockSize, codec, progress, metadata);
- }
-
- return writer;
- }
-
- /**
- * Construct the preferred type of MySequenceFile Writer.
- *
- * @param fs
- * The configured filesystem.
- * @param conf
- * The configuration.
- * @param name
- * The name of the file.
- * @param keyClass
- * The 'key' type.
- * @param valClass
- * The 'value' type.
- * @param compressionType
- * The compression type.
- * @param codec
- * The compression codec.
- * @param progress
- * The Progressable object to track progress.
- * @return Returns the handle to the constructed MySequenceFile Writer.
- */
- public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType,
- CompressionCodec codec, Progressable progress) throws IOException {
- Writer writer = createWriter(fs, conf, name, keyClass, valClass, compressionType, codec, progress, new Metadata());
- return writer;
- }
-
- /**
- * Construct the preferred type of 'raw' MySequenceFile Writer.
- *
- * @param out
- * The stream on top which the writer is to be constructed.
- * @param keyClass
- * The 'key' type.
- * @param valClass
- * The 'value' type.
- * @param compress
- * Compress data?
- * @param blockCompress
- * Compress blocks?
- * @param metadata
- * The metadata of the file.
- * @return Returns the handle to the constructed MySequenceFile Writer.
- * @throws IOException
- */
- private static Writer createWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, boolean compress, boolean blockCompress,
- CompressionCodec codec, Metadata metadata) throws IOException {
- if (codec != null && (codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && !ZlibFactory.isNativeZlibLoaded(conf)) {
- throw new IllegalArgumentException("MySequenceFile doesn't work with " + "GzipCodec without native-hadoop code!");
- }
-
- Writer writer = null;
-
- if (!compress) {
- writer = new Writer(conf, out, keyClass, valClass, metadata);
- } else if (compress && !blockCompress) {
- writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
- } else {
- writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
- }
-
- return writer;
- }
-
- /**
- * Construct the preferred type of 'raw' MySequenceFile Writer.
- *
- * @param fs
- * The configured filesystem.
- * @param conf
- * The configuration.
- * @param file
- * The name of the file.
- * @param keyClass
- * The 'key' type.
- * @param valClass
- * The 'value' type.
- * @param compress
- * Compress data?
- * @param blockCompress
- * Compress blocks?
- * @param codec
- * The compression codec.
- * @param progress
- * @param metadata
- * The metadata of the file.
- * @return Returns the handle to the constructed MySequenceFile Writer.
- * @throws IOException
- */
- private static Writer createWriter(FileSystem fs, Configuration conf, Path file, Class keyClass, Class valClass, boolean compress, boolean blockCompress,
- CompressionCodec codec, Progressable progress, Metadata metadata) throws IOException {
- if (codec != null && (codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && !ZlibFactory.isNativeZlibLoaded(conf)) {
- throw new IllegalArgumentException("MySequenceFile doesn't work with " + "GzipCodec without native-hadoop code!");
- }
-
- Writer writer = null;
-
- if (!compress) {
- writer = new Writer(fs, conf, file, keyClass, valClass, progress, metadata);
- } else if (compress && !blockCompress) {
- writer = new RecordCompressWriter(fs, conf, file, keyClass, valClass, codec, progress, metadata);
- } else {
- writer = new BlockCompressWriter(fs, conf, file, keyClass, valClass, codec, progress, metadata);
- }
-
- return writer;
- }
-
- /**
- * Construct the preferred type of 'raw' MySequenceFile Writer.
- *
- * @param conf
- * The configuration.
- * @param out
- * The stream on top which the writer is to be constructed.
- * @param keyClass
- * The 'key' type.
- * @param valClass
- * The 'value' type.
- * @param compressionType
- * The compression type.
- * @param codec
- * The compression codec.
- * @param metadata
- * The metadata of the file.
- * @return Returns the handle to the constructed MySequenceFile Writer.
- */
- public static Writer createWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, CompressionType compressionType,
- CompressionCodec codec, Metadata metadata) throws IOException {
- if ((codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && !ZlibFactory.isNativeZlibLoaded(conf)) {
- throw new IllegalArgumentException("MySequenceFile doesn't work with " + "GzipCodec without native-hadoop code!");
- }
-
- Writer writer = null;
-
- if (compressionType == CompressionType.NONE) {
- writer = new Writer(conf, out, keyClass, valClass, metadata);
- } else if (compressionType == CompressionType.RECORD) {
- writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
- } else if (compressionType == CompressionType.BLOCK) {
- writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
- }
-
- return writer;
- }
-
- /**
- * Construct the preferred type of 'raw' MySequenceFile Writer.
- *
- * @param conf
- * The configuration.
- * @param out
- * The stream on top which the writer is to be constructed.
- * @param keyClass
- * The 'key' type.
- * @param valClass
- * The 'value' type.
- * @param compressionType
- * The compression type.
- * @param codec
- * The compression codec.
- * @return Returns the handle to the constructed MySequenceFile Writer.
- */
- public static Writer createWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, CompressionType compressionType,
- CompressionCodec codec) throws IOException {
- Writer writer = createWriter(conf, out, keyClass, valClass, compressionType, codec, new Metadata());
- return writer;
- }
-
/** The interface to 'raw' values of SequenceFiles. */
public static interface ValueBytes {
@@ -801,534 +440,6 @@ public class MySequenceFile {
}
}
- /** Write key/value pairs to a sequence-format file. */
- public static class Writer implements java.io.Closeable {
- Configuration conf;
- FSDataOutputStream out;
- boolean ownOutputStream = true;
- DataOutputBuffer buffer = new DataOutputBuffer();
-
- Class keyClass;
- Class valClass;
-
- private boolean compress;
- CompressionCodec codec = null;
- CompressionOutputStream deflateFilter = null;
- DataOutputStream deflateOut = null;
- Metadata metadata = null;
- Compressor compressor = null;
-
- protected Serializer keySerializer;
- protected Serializer uncompressedValSerializer;
- protected Serializer compressedValSerializer;
-
- // Insert a globally unique 16-byte value every few entries, so that one
- // can seek into the middle of a file and then synchronize with record
- // starts and ends by scanning for this value.
- long lastSyncPos; // position of last sync
- byte[] sync; // 16 random bytes
- {
- try {
- MessageDigest digester = MessageDigest.getInstance("MD5");
- long time = System.currentTimeMillis();
- digester.update((new UID() + "@" + time).getBytes());
- sync = digester.digest();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /** Implicit constructor: needed for the period of transition! */
- Writer() {}
-
- /** Create the named file. */
- public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass) throws IOException {
- this(fs, conf, name, keyClass, valClass, null, new Metadata());
- }
-
- /** Create the named file with write-progress reporter. */
- public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, Progressable progress, Metadata metadata) throws IOException {
- this(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(), fs.getDefaultBlockSize(),
- progress, metadata);
- }
-
- /** Create the named file with write-progress reporter. */
- public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, int bufferSize, short replication, long blockSize,
- Progressable progress, Metadata metadata) throws IOException {
- init(name, conf, fs.create(name, true, bufferSize, replication, blockSize, progress), keyClass, valClass, false, null, metadata);
- initializeFileHeader();
- writeFileHeader();
- finalizeFileHeader();
- }
-
- /** Write to an arbitrary stream using a specified buffer size. */
- private Writer(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, Metadata metadata) throws IOException {
- this.ownOutputStream = false;
- init(null, conf, out, keyClass, valClass, false, null, metadata);
-
- initializeFileHeader();
- writeFileHeader();
- finalizeFileHeader();
- }
-
- /** Write the initial part of file header. */
- void initializeFileHeader() throws IOException {
- out.write(VERSION);
- }
-
- /** Write the final part of file header. */
- void finalizeFileHeader() throws IOException {
- out.write(sync); // write the sync bytes
- out.flush(); // flush header
- }
-
- boolean isCompressed() {
- return compress;
- }
-
- boolean isBlockCompressed() {
- return false;
- }
-
- /** Write and flush the file header. */
- void writeFileHeader() throws IOException {
- Text.writeString(out, keyClass.getName());
- Text.writeString(out, valClass.getName());
-
- out.writeBoolean(this.isCompressed());
- out.writeBoolean(this.isBlockCompressed());
-
- if (this.isCompressed()) {
- Text.writeString(out, (codec.getClass()).getName());
- }
- this.metadata.write(out);
- }
-
- /** Initialize. */
-
- void init(Path name, Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, boolean compress, CompressionCodec codec, Metadata metadata)
- throws IOException {
- this.conf = conf;
- this.out = out;
- this.keyClass = keyClass;
- this.valClass = valClass;
- this.compress = compress;
- this.codec = codec;
- this.metadata = metadata;
- SerializationFactory serializationFactory = new SerializationFactory(conf);
- this.keySerializer = serializationFactory.getSerializer(keyClass);
- this.keySerializer.open(buffer);
- this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
- this.uncompressedValSerializer.open(buffer);
- if (this.codec != null) {
- ReflectionUtils.setConf(this.codec, this.conf);
- this.compressor = CodecPool.getCompressor(this.codec);
- this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
- this.deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
- this.compressedValSerializer = serializationFactory.getSerializer(valClass);
- this.compressedValSerializer.open(deflateOut);
- }
- }
-
- /** Returns the class of keys in this file. */
- public Class getKeyClass() {
- return keyClass;
- }
-
- /** Returns the class of values in this file. */
- public Class getValueClass() {
- return valClass;
- }
-
- /** Returns the compression codec of data in this file. */
- public CompressionCodec getCompressionCodec() {
- return codec;
- }
-
- /** create a sync point */
- public void sync() throws IOException {
- if (sync != null && lastSyncPos != out.getPos()) {
- out.writeInt(SYNC_ESCAPE); // mark the start of the sync
- out.write(sync); // write sync
- lastSyncPos = out.getPos(); // update lastSyncPos
- }
- }
-
- /** Returns the configuration of this file. */
- Configuration getConf() {
- return conf;
- }
-
- /** Close the file. */
- public synchronized void close() throws IOException {
- keySerializer.close();
- uncompressedValSerializer.close();
- if (compressedValSerializer != null) {
- compressedValSerializer.close();
- }
-
- CodecPool.returnCompressor(compressor);
- compressor = null;
-
- if (out != null) {
-
- // Close the underlying stream iff we own it...
- if (ownOutputStream) {
- out.close();
- } else {
- out.flush();
- }
- out = null;
- }
- }
-
- synchronized void checkAndWriteSync() throws IOException {
- if (sync != null && out.getPos() >= lastSyncPos + SYNC_INTERVAL) { // time to emit sync
- sync();
- }
- }
-
- /** Append a key/value pair. */
- public synchronized void append(Writable key, Writable val) throws IOException {
- append((Object) key, (Object) val);
- }
-
- /** Append a key/value pair. */
-
- public synchronized void append(Object key, Object val) throws IOException {
- if (key.getClass() != keyClass)
- throw new IOException("wrong key class: " + key.getClass().getName() + " is not " + keyClass);
- if (val.getClass() != valClass)
- throw new IOException("wrong value class: " + val.getClass().getName() + " is not " + valClass);
-
- buffer.reset();
-
- // Append the 'key'
- keySerializer.serialize(key);
- int keyLength = buffer.getLength();
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + key);
-
- // Append the 'value'
- if (compress) {
- deflateFilter.resetState();
- compressedValSerializer.serialize(val);
- deflateOut.flush();
- deflateFilter.finish();
- } else {
- uncompressedValSerializer.serialize(val);
- }
-
- // Write the record out
- checkAndWriteSync(); // sync
- out.writeInt(buffer.getLength()); // total record length
- out.writeInt(keyLength); // key portion length
- out.write(buffer.getData(), 0, buffer.getLength()); // data
- }
-
- public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + keyLength);
-
- int valLength = val.getSize();
-
- checkAndWriteSync();
-
- out.writeInt(keyLength + valLength); // total record length
- out.writeInt(keyLength); // key portion length
- out.write(keyData, keyOffset, keyLength); // key
- val.writeUncompressedBytes(out); // value
- }
-
- /**
- * Returns the current length of the output file.
- *
- * <p>
- * This always returns a synchronized position. In other words, immediately after calling {@link MySequenceFile.Reader#seek(long)} with a position returned
- * by this method, {@link MySequenceFile.Reader#next(Writable)} may be called. However the key may be earlier in the file than key last written when this
- * method was called (e.g., with block-compression, it may be the first key in the block that was being written when this method was called).
- */
- public synchronized long getLength() throws IOException {
- return out.getPos();
- }
-
- } // class Writer
-
- /** Write key/compressed-value pairs to a sequence-format file. */
- static class RecordCompressWriter extends Writer {
-
- /** Create the named file. */
- public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec) throws IOException {
- this(conf, fs.create(name), keyClass, valClass, codec, new Metadata());
- }
-
- /** Create the named file with write-progress reporter. */
- public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec, Progressable progress,
- Metadata metadata) throws IOException {
- this(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
- progress, metadata);
- }
-
- /** Create the named file with write-progress reporter. */
- public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, int bufferSize, short replication,
- long blockSize, CompressionCodec codec, Progressable progress, Metadata metadata) throws IOException {
- super.init(name, conf, fs.create(name, true, bufferSize, replication, blockSize, progress), keyClass, valClass, true, codec, metadata);
-
- initializeFileHeader();
- writeFileHeader();
- finalizeFileHeader();
- }
-
- /** Create the named file with write-progress reporter. */
- public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec, Progressable progress)
- throws IOException {
- this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
- }
-
- /** Write to an arbitrary stream using a specified buffer size. */
- private RecordCompressWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
- throws IOException {
- this.ownOutputStream = false;
- super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
-
- initializeFileHeader();
- writeFileHeader();
- finalizeFileHeader();
-
- }
-
- boolean isCompressed() {
- return true;
- }
-
- boolean isBlockCompressed() {
- return false;
- }
-
- /** Append a key/value pair. */
-
- public synchronized void append(Object key, Object val) throws IOException {
- if (key.getClass() != keyClass)
- throw new IOException("wrong key class: " + key.getClass().getName() + " is not " + keyClass);
- if (val.getClass() != valClass)
- throw new IOException("wrong value class: " + val.getClass().getName() + " is not " + valClass);
-
- buffer.reset();
-
- // Append the 'key'
- keySerializer.serialize(key);
- int keyLength = buffer.getLength();
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + key);
-
- // Compress 'value' and append it
- deflateFilter.resetState();
- compressedValSerializer.serialize(val);
- deflateOut.flush();
- deflateFilter.finish();
-
- // Write the record out
- checkAndWriteSync(); // sync
- out.writeInt(buffer.getLength()); // total record length
- out.writeInt(keyLength); // key portion length
- out.write(buffer.getData(), 0, buffer.getLength()); // data
- }
-
- /** Append a key/value pair. */
- public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {
-
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + keyLength);
-
- int valLength = val.getSize();
-
- checkAndWriteSync(); // sync
- out.writeInt(keyLength + valLength); // total record length
- out.writeInt(keyLength); // key portion length
- out.write(keyData, keyOffset, keyLength); // 'key' data
- val.writeCompressedBytes(out); // 'value' data
- }
-
- } // RecordCompressionWriter
-
- /** Write compressed key/value blocks to a sequence-format file. */
- static class BlockCompressWriter extends Writer {
-
- private int noBufferedRecords = 0;
-
- private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
- private DataOutputBuffer keyBuffer = new DataOutputBuffer();
-
- private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
- private DataOutputBuffer valBuffer = new DataOutputBuffer();
-
- private int compressionBlockSize;
-
- /** Create the named file. */
- public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec) throws IOException {
- this(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
- null, new Metadata());
- }
-
- /** Create the named file with write-progress reporter. */
- public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec, Progressable progress,
- Metadata metadata) throws IOException {
- this(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
- progress, metadata);
- }
-
- /** Create the named file with write-progress reporter. */
- public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, int bufferSize, short replication, long blockSize,
- CompressionCodec codec, Progressable progress, Metadata metadata) throws IOException {
- super.init(name, conf, fs.create(name, true, bufferSize, replication, blockSize, progress), keyClass, valClass, true, codec, metadata);
- init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
-
- initializeFileHeader();
- writeFileHeader();
- finalizeFileHeader();
- }
-
- /** Create the named file with write-progress reporter. */
- public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec, Progressable progress)
- throws IOException {
- this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
- }
-
- /** Write to an arbitrary stream using a specified buffer size. */
- private BlockCompressWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
- throws IOException {
- this.ownOutputStream = false;
- super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
- init(1000000);
-
- initializeFileHeader();
- writeFileHeader();
- finalizeFileHeader();
- }
-
- boolean isCompressed() {
- return true;
- }
-
- boolean isBlockCompressed() {
- return true;
- }
-
- /** Initialize */
- void init(int compressionBlockSize) throws IOException {
- this.compressionBlockSize = compressionBlockSize;
- keySerializer.close();
- keySerializer.open(keyBuffer);
- uncompressedValSerializer.close();
- uncompressedValSerializer.open(valBuffer);
- }
-
- /** Workhorse to check and write out compressed data/lengths */
- private synchronized void writeBuffer(DataOutputBuffer uncompressedDataBuffer) throws IOException {
- deflateFilter.resetState();
- buffer.reset();
- deflateOut.write(uncompressedDataBuffer.getData(), 0, uncompressedDataBuffer.getLength());
- deflateOut.flush();
- deflateFilter.finish();
-
- WritableUtils.writeVInt(out, buffer.getLength());
- out.write(buffer.getData(), 0, buffer.getLength());
- }
-
- /** Compress and flush contents to dfs */
- public synchronized void sync() throws IOException {
- if (noBufferedRecords > 0) {
- super.sync();
-
- // No. of records
- WritableUtils.writeVInt(out, noBufferedRecords);
-
- // Write 'keys' and lengths
- writeBuffer(keyLenBuffer);
- writeBuffer(keyBuffer);
-
- // Write 'values' and lengths
- writeBuffer(valLenBuffer);
- writeBuffer(valBuffer);
-
- // Flush the file-stream
- out.flush();
-
- // Reset internal states
- keyLenBuffer.reset();
- keyBuffer.reset();
- valLenBuffer.reset();
- valBuffer.reset();
- noBufferedRecords = 0;
- }
-
- }
-
- /** Close the file. */
- public synchronized void close() throws IOException {
- if (out != null) {
- sync();
- }
- super.close();
- }
-
- /** Append a key/value pair. */
-
- public synchronized void append(Object key, Object val) throws IOException {
- if (key.getClass() != keyClass)
- throw new IOException("wrong key class: " + key + " is not " + keyClass);
- if (val.getClass() != valClass)
- throw new IOException("wrong value class: " + val + " is not " + valClass);
-
- // Save key/value into respective buffers
- int oldKeyLength = keyBuffer.getLength();
- keySerializer.serialize(key);
- int keyLength = keyBuffer.getLength() - oldKeyLength;
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + key);
- WritableUtils.writeVInt(keyLenBuffer, keyLength);
-
- int oldValLength = valBuffer.getLength();
- uncompressedValSerializer.serialize(val);
- int valLength = valBuffer.getLength() - oldValLength;
- WritableUtils.writeVInt(valLenBuffer, valLength);
-
- // Added another key/value pair
- ++noBufferedRecords;
-
- // Compress and flush?
- int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
- if (currentBlockSize >= compressionBlockSize) {
- sync();
- }
- }
-
- /** Append a key/value pair. */
- public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {
-
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed");
-
- int valLength = val.getSize();
-
- // Save key/value data in relevant buffers
- WritableUtils.writeVInt(keyLenBuffer, keyLength);
- keyBuffer.write(keyData, keyOffset, keyLength);
- WritableUtils.writeVInt(valLenBuffer, valLength);
- val.writeUncompressedBytes(valBuffer);
-
- // Added another key/value pair
- ++noBufferedRecords;
-
- // Compress and flush?
- int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
- if (currentBlockSize >= compressionBlockSize) {
- sync();
- }
- }
-
- } // BlockCompressionWriter
-
/** Reads key/value pairs from a sequence-format file. */
public static class Reader implements java.io.Closeable {
private Path file;
@@ -2167,1008 +1278,4 @@ public class MySequenceFile {
}
- /**
- * Sorts key/value pairs in a sequence-format file.
- *
- * <p>
- * For best performance, applications should make sure that the {@link Writable#readFields(DataInput)} implementation of their keys is very efficient. In
- * particular, it should avoid allocating memory.
- */
- public static class Sorter {
-
- private RawComparator comparator;
-
- private MergeSort mergeSort; // the implementation of merge sort
-
- private Path[] inFiles; // when merging or sorting
-
- private Path outFile;
-
- private int memory; // bytes
- private int factor; // merged per pass
-
- private FileSystem fs = null;
-
- private Class keyClass;
- private Class valClass;
-
- private Configuration conf;
-
- private Progressable progressable = null;
-
- /** Sort and merge files containing the named classes. */
- public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass, Class valClass, Configuration conf) {
- this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
- }
-
- /** Sort and merge using an arbitrary {@link RawComparator}. */
- public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, Class valClass, Configuration conf) {
- this.fs = fs;
- this.comparator = comparator;
- this.keyClass = keyClass;
- this.valClass = valClass;
- this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
- this.factor = conf.getInt("io.sort.factor", 100);
- this.conf = conf;
- }
-
- /** Set the number of streams to merge at once. */
- public void setFactor(int factor) {
- this.factor = factor;
- }
-
- /** Get the number of streams to merge at once. */
- public int getFactor() {
- return factor;
- }
-
- /** Set the total amount of buffer memory, in bytes. */
- public void setMemory(int memory) {
- this.memory = memory;
- }
-
- /** Get the total amount of buffer memory, in bytes. */
- public int getMemory() {
- return memory;
- }
-
- /** Set the progressable object in order to report progress. */
- public void setProgressable(Progressable progressable) {
- this.progressable = progressable;
- }
-
- /**
- * Perform a file sort from a set of input files into an output file.
- *
- * @param inFiles
- * the files to be sorted
- * @param outFile
- * the sorted output file
- * @param deleteInput
- * should the input files be deleted as they are read?
- */
- public void sort(Path[] inFiles, Path outFile, boolean deleteInput) throws IOException {
- if (fs.exists(outFile)) {
- throw new IOException("already exists: " + outFile);
- }
-
- this.inFiles = inFiles;
- this.outFile = outFile;
-
- int segments = sortPass(deleteInput);
- if (segments > 1) {
- mergePass(outFile.getParent());
- }
- }
-
- /**
- * Perform a file sort from a set of input files and return an iterator.
- *
- * @param inFiles
- * the files to be sorted
- * @param tempDir
- * the directory where temp files are created during sort
- * @param deleteInput
- * should the input files be deleted as they are read?
- * @return iterator the RawKeyValueIterator
- */
- public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, boolean deleteInput) throws IOException {
- Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
- if (fs.exists(outFile)) {
- throw new IOException("already exists: " + outFile);
- }
- this.inFiles = inFiles;
- // outFile will basically be used as prefix for temp files in the cases
- // where sort outputs multiple sorted segments. For the single segment
- // case, the outputFile itself will contain the sorted data for that
- // segment
- this.outFile = outFile;
-
- int segments = sortPass(deleteInput);
- if (segments > 1)
- return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), tempDir);
- else if (segments == 1)
- return merge(new Path[] {outFile}, true, tempDir);
- else
- return null;
- }
-
- /**
- * The backwards compatible interface to sort.
- *
- * @param inFile
- * the input file to sort
- * @param outFile
- * the sorted output file
- */
- public void sort(Path inFile, Path outFile) throws IOException {
- sort(new Path[] {inFile}, outFile, false);
- }
-
- private int sortPass(boolean deleteInput) throws IOException {
- LOG.debug("running sort pass");
- SortPass sortPass = new SortPass(); // make the SortPass
- sortPass.setProgressable(progressable);
- mergeSort = new MergeSort(sortPass.new SeqFileComparator());
- try {
- return sortPass.run(deleteInput); // run it
- } finally {
- sortPass.close(); // close it
- }
- }
-
- private class SortPass {
- private int memoryLimit = memory / 4;
- private int recordLimit = 1000000;
-
- private DataOutputBuffer rawKeys = new DataOutputBuffer();
- private byte[] rawBuffer;
-
- private int[] keyOffsets = new int[1024];
- private int[] pointers = new int[keyOffsets.length];
- private int[] pointersCopy = new int[keyOffsets.length];
- private int[] keyLengths = new int[keyOffsets.length];
- private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
-
- private ArrayList segmentLengths = new ArrayList();
-
- private Reader in = null;
- private FSDataOutputStream out = null;
- private FSDataOutputStream indexOut = null;
- private Path outName;
-
- private Progressable progressable = null;
-
- public int run(boolean deleteInput) throws IOException {
- int segments = 0;
- int currentFile = 0;
- boolean atEof = (currentFile >= inFiles.length);
- boolean isCompressed = false;
- boolean isBlockCompressed = false;
- CompressionCodec codec = null;
- segmentLengths.clear();
- if (atEof) {
- return 0;
- }
-
- // Initialize
- in = new Reader(fs, inFiles[currentFile], conf);
- isCompressed = in.isCompressed();
- isBlockCompressed = in.isBlockCompressed();
- codec = in.getCompressionCodec();
-
- for (int i = 0; i < rawValues.length; ++i) {
- rawValues[i] = null;
- }
-
- while (!atEof) {
- int count = 0;
- int bytesProcessed = 0;
- rawKeys.reset();
- while (!atEof && bytesProcessed < memoryLimit && count < recordLimit) {
-
- // Read a record into buffer
- // Note: Attempt to re-use 'rawValue' as far as possible
- int keyOffset = rawKeys.getLength();
- ValueBytes rawValue = (count == keyOffsets.length || rawValues[count] == null) ? in.createValueBytes() : rawValues[count];
- int recordLength = in.nextRaw(rawKeys, rawValue);
- if (recordLength == -1) {
- in.close();
- if (deleteInput) {
- fs.delete(inFiles[currentFile], true);
- }
- currentFile += 1;
- atEof = currentFile >= inFiles.length;
- if (!atEof) {
- in = new Reader(fs, inFiles[currentFile], conf);
- } else {
- in = null;
- }
- continue;
- }
-
- int keyLength = rawKeys.getLength() - keyOffset;
-
- if (count == keyOffsets.length)
- grow();
-
- keyOffsets[count] = keyOffset; // update pointers
- pointers[count] = count;
- keyLengths[count] = keyLength;
- rawValues[count] = rawValue;
-
- bytesProcessed += recordLength;
- count++;
- }
-
- // buffer is full -- sort & flush it
- LOG.debug("flushing segment " + segments);
- rawBuffer = rawKeys.getData();
- sort(count);
- // indicate we're making progress
- if (progressable != null) {
- progressable.progress();
- }
- flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec, segments == 0 && atEof);
- segments++;
- }
- return segments;
- }
-
- public void close() throws IOException {
- if (in != null) {
- in.close();
- }
- if (out != null) {
- out.close();
- }
- if (indexOut != null) {
- indexOut.close();
- }
- }
-
- private void grow() {
- int newLength = keyOffsets.length * 3 / 2;
- keyOffsets = grow(keyOffsets, newLength);
- pointers = grow(pointers, newLength);
- pointersCopy = new int[newLength];
- keyLengths = grow(keyLengths, newLength);
- rawValues = grow(rawValues, newLength);
- }
-
- private int[] grow(int[] old, int newLength) {
- int[] result = new int[newLength];
- System.arraycopy(old, 0, result, 0, old.length);
- return result;
- }
-
- private ValueBytes[] grow(ValueBytes[] old, int newLength) {
- ValueBytes[] result = new ValueBytes[newLength];
- System.arraycopy(old, 0, result, 0, old.length);
- for (int i = old.length; i < newLength; ++i) {
- result[i] = null;
- }
- return result;
- }
-
- private void flush(int count, int bytesProcessed, boolean isCompressed, boolean isBlockCompressed, CompressionCodec codec, boolean done)
- throws IOException {
- if (out == null) {
- outName = done ? outFile : outFile.suffix(".0");
- out = fs.create(outName);
- if (!done) {
- indexOut = fs.create(outName.suffix(".index"));
- }
- }
-
- long segmentStart = out.getPos();
- Writer writer = createWriter(conf, out, keyClass, valClass, isCompressed, isBlockCompressed, codec, new Metadata());
-
- if (!done) {
- writer.sync = null; // disable sync on temp files
- }
-
- for (int i = 0; i < count; i++) { // write in sorted order
- int p = pointers[i];
- writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
- }
- writer.close();
-
- if (!done) {
- // Save the segment length
- WritableUtils.writeVLong(indexOut, segmentStart);
- WritableUtils.writeVLong(indexOut, (out.getPos() - segmentStart));
- indexOut.flush();
- }
- }
-
- private void sort(int count) {
- System.arraycopy(pointers, 0, pointersCopy, 0, count);
- mergeSort.mergeSort(pointersCopy, pointers, 0, count);
- }
-
- class SeqFileComparator implements Comparator<IntWritable> {
- public int compare(IntWritable I, IntWritable J) {
- return comparator.compare(rawBuffer, keyOffsets[I.get()], keyLengths[I.get()], rawBuffer, keyOffsets[J.get()], keyLengths[J.get()]);
- }
- }
-
- /** set the progressable object in order to report progress */
- public void setProgressable(Progressable progressable) {
- this.progressable = progressable;
- }
-
- } // MySequenceFile.Sorter.SortPass
-
- /** The interface to iterate over raw keys/values of SequenceFiles. */
- public static interface RawKeyValueIterator {
- /**
- * Gets the current raw key
- *
- * @return DataOutputBuffer
- * @throws IOException
- */
- DataOutputBuffer getKey() throws IOException;
-
- /**
- * Gets the current raw value
- *
- * @return ValueBytes
- * @throws IOException
- */
- ValueBytes getValue() throws IOException;
-
- /**
- * Sets up the current key and value (for getKey and getValue)
- *
- * @return true if there exists a key/value, false otherwise
- * @throws IOException
- */
- boolean next() throws IOException;
-
- /**
- * closes the iterator so that the underlying streams can be closed
- *
- * @throws IOException
- */
- void close() throws IOException;
-
- /**
- * Gets the Progress object; this has a float (0.0 - 1.0) indicating the bytes processed by the iterator so far
- */
- Progress getProgress();
- }
-
- /**
- * Merges the list of segments of type <code>SegmentDescriptor</code>
- *
- * @param segments
- * the list of SegmentDescriptors
- * @param tmpDir
- * the directory to write temporary files into
- * @return RawKeyValueIterator
- */
- public RawKeyValueIterator merge(List<SegmentDescriptor> segments, Path tmpDir) throws IOException {
- // pass in object to report progress, if present
- MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
- return mQueue.merge();
- }
-
- /**
- * Merges the contents of files passed in Path[] using a max factor value that is already set
- *
- * @param inNames
- * the array of path names
- * @param deleteInputs
- * true if the input files should be deleted when unnecessary
- * @param tmpDir
- * the directory to write temporary files into
- * @return RawKeyValueIteratorMergeQueue
- */
- public RawKeyValueIterator merge(Path[] inNames, boolean deleteInputs, Path tmpDir) throws IOException {
- return merge(inNames, deleteInputs, (inNames.length < factor) ? inNames.length : factor, tmpDir);
- }
-
- /**
- * Merges the contents of files passed in Path[]
- *
- * @param inNames
- * the array of path names
- * @param deleteInputs
- * true if the input files should be deleted when unnecessary
- * @param factor
- * the factor that will be used as the maximum merge fan-in
- * @param tmpDir
- * the directory to write temporary files into
- * @return RawKeyValueIteratorMergeQueue
- */
- public RawKeyValueIterator merge(Path[] inNames, boolean deleteInputs, int factor, Path tmpDir) throws IOException {
- // get the segments from inNames
- ArrayList<SegmentDescriptor> a = new ArrayList<SegmentDescriptor>();
- for (int i = 0; i < inNames.length; i++) {
- SegmentDescriptor s = new SegmentDescriptor(0, fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
- s.preserveInput(!deleteInputs);
- s.doSync();
- a.add(s);
- }
- this.factor = factor;
- MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
- return mQueue.merge();
- }
-
- /**
- * Merges the contents of files passed in Path[]
- *
- * @param inNames
- * the array of path names
- * @param tempDir
- * the directory for creating temp files during merge
- * @param deleteInputs
- * true if the input files should be deleted when unnecessary
- * @return RawKeyValueIteratorMergeQueue
- */
- public RawKeyValueIterator merge(Path[] inNames, Path tempDir, boolean deleteInputs) throws IOException {
- // outFile will basically be used as prefix for temp files for the
- // intermediate merge outputs
- this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
- // get the segments from inNames
- ArrayList<SegmentDescriptor> a = new ArrayList<SegmentDescriptor>();
- for (int i = 0; i < inNames.length; i++) {
- SegmentDescriptor s = new SegmentDescriptor(0, fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
- s.preserveInput(!deleteInputs);
- s.doSync();
- a.add(s);
- }
- factor = (inNames.length < factor) ? inNames.length : factor;
- // pass in object to report progress, if present
- MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
- return mQueue.merge();
- }
-
- /**
- * Clones the attributes (like compression of the input file and creates a corresponding Writer
- *
- * @param inputFile
- * the path of the input file whose attributes should be cloned
- * @param outputFile
- * the path of the output file
- * @param prog
- * the Progressable to report status during the file write
- * @return Writer
- */
- public Writer cloneFileAttributes(Path inputFile, Path outputFile, Progressable prog) throws IOException {
- FileSystem srcFileSys = inputFile.getFileSystem(conf);
- Reader reader = new Reader(srcFileSys, inputFile, 4096, conf, true);
- boolean compress = reader.isCompressed();
- boolean blockCompress = reader.isBlockCompressed();
- CompressionCodec codec = reader.getCompressionCodec();
- reader.close();
-
- Writer writer = createWriter(outputFile.getFileSystem(conf), conf, outputFile, keyClass, valClass, compress, blockCompress, codec, prog, new Metadata());
- return writer;
- }
-
- /**
- * Writes records from RawKeyValueIterator into a file represented by the passed writer
- *
- * @param records
- * the RawKeyValueIterator
- * @param writer
- * the Writer created earlier
- */
- public void writeFile(RawKeyValueIterator records, Writer writer) throws IOException {
- while (records.next()) {
- writer.appendRaw(records.getKey().getData(), 0, records.getKey().getLength(), records.getValue());
- }
- writer.sync();
- }
-
- /**
- * Merge the provided files.
- *
- * @param inFiles
- * the array of input path names
- * @param outFile
- * the final output file
- */
- public void merge(Path[] inFiles, Path outFile) throws IOException {
- if (fs.exists(outFile)) {
- throw new IOException("already exists: " + outFile);
- }
- RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
- Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
-
- writeFile(r, writer);
-
- writer.close();
- }
-
- /** sort calls this to generate the final merged output */
- private int mergePass(Path tmpDir) throws IOException {
- LOG.debug("running merge pass");
- Writer writer = cloneFileAttributes(outFile.suffix(".0"), outFile, null);
- RawKeyValueIterator r = merge(outFile.suffix(".0"), outFile.suffix(".0.index"), tmpDir);
- writeFile(r, writer);
-
- writer.close();
- return 0;
- }
-
- /**
- * Used by mergePass to merge the output of the sort
- *
- * @param inName
- * the name of the input file containing sorted segments
- * @param indexIn
- * the offsets of the sorted segments
- * @param tmpDir
- * the relative directory to store intermediate results in
- * @return RawKeyValueIterator
- * @throws IOException
- */
- private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) throws IOException {
- // get the segments from indexIn
- // we create a SegmentContainer so that we can track segments belonging to
- // inName and delete inName as soon as we see that we have looked at all
- // the contained segments during the merge process & hence don't need
- // them anymore
- SegmentContainer container = new SegmentContainer(inName, indexIn);
- MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
- return mQueue.merge();
- }
-
- /** This class implements the core of the merge logic */
- private class MergeQueue extends PriorityQueue implements RawKeyValueIterator {
- private boolean compress;
- private boolean blockCompress;
- private DataOutputBuffer rawKey = new DataOutputBuffer();
- private ValueBytes rawValue;
- private long totalBytesProcessed;
- private float progPerByte;
- private Progress mergeProgress = new Progress();
- private Path tmpDir;
- private Progressable progress = null; // handle to the progress reporting object
- private SegmentDescriptor minSegment;
-
- // a TreeMap used to store the segments sorted by size (segment offset and
- // segment path name is used to break ties between segments of same sizes)
- private Map<SegmentDescriptor,Void> sortedSegmentSizes = new TreeMap<SegmentDescriptor,Void>();
-
- public void put(SegmentDescriptor stream) throws IOException {
- if (size() == 0) {
- compress = stream.in.isCompressed();
- blockCompress = stream.in.isBlockCompressed();
- } else if (compress != stream.in.isCompressed() || blockCompress != stream.in.isBlockCompressed()) {
- throw new IOException("All merged files must be compressed or not.");
- }
- super.put(stream);
- }
-
- /**
- * A queue of file segments to merge
- *
- * @param segments
- * the file segments to merge
- * @param tmpDir
- * a relative local directory to save intermediate files in
- * @param progress
- * the reference to the Progressable object
- */
- public MergeQueue(List<SegmentDescriptor> segments, Path tmpDir, Progressable progress) {
- int size = segments.size();
- for (int i = 0; i < size; i++) {
- sortedSegmentSizes.put(segments.get(i), null);
- }
- this.tmpDir = tmpDir;
- this.progress = progress;
- }
-
- protected boolean lessThan(Object a, Object b) {
- // indicate we're making progress
- if (progress != null) {
- progress.progress();
- }
- SegmentDescriptor msa = (SegmentDescriptor) a;
- SegmentDescriptor msb = (SegmentDescriptor) b;
- return comparator.compare(msa.getKey().getData(), 0, msa.getKey().getLength(), msb.getKey().getData(), 0, msb.getKey().getLength()) < 0;
- }
-
- public void close() throws IOException {
- SegmentDescriptor ms; // close inputs
- while ((ms = (SegmentDescriptor) pop()) != null) {
- ms.cleanup();
- }
- minSegment = null;
- }
-
- public DataOutputBuffer getKey() throws IOException {
- return rawKey;
- }
-
- public ValueBytes getValue() throws IOException {
- return rawValue;
- }
-
- public boolean next() throws IOException {
- if (size() == 0)
- return false;
- if (minSegment != null) {
- // minSegment is non-null for all invocations of next except the first
- // one. For the first invocation, the priority queue is ready for use
- // but for the subsequent invocations, first adjust the queue
- adjustPriorityQueue(minSegment);
- if (size() == 0) {
- minSegment = null;
- return false;
- }
- }
- minSegment = (SegmentDescriptor) top();
- long startPos = minSegment.in.getPosition(); // Current position in stream
- // save the raw key reference
- rawKey = minSegment.getKey();
- // load the raw value. Re-use the existing rawValue buffer
- if (rawValue == null) {
- rawValue = minSegment.in.createValueBytes();
- }
- minSegment.nextRawValue(rawValue);
- long endPos = minSegment.in.getPosition(); // End position after reading value
- updateProgress(endPos - startPos);
- return true;
- }
-
- public Progress getProgress() {
- return mergeProgress;
- }
-
- private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException {
- long startPos = ms.in.getPosition(); // Current position in stream
- boolean hasNext = ms.nextRawKey();
- long endPos = ms.in.getPosition(); // End position after reading key
- updateProgress(endPos - startPos);
- if (hasNext) {
- adjustTop();
- } else {
- pop();
- ms.cleanup();
- }
- }
-
- private void updateProgress(long bytesProcessed) {
- totalBytesProcessed += bytesProcessed;
- if (progPerByte > 0) {
- mergeProgress.set(totalBytesProcessed * progPerByte);
- }
- }
-
- /**
- * This is the single level merge that is called multiple times depending on the factor size and the number of segments
- *
- * @return RawKeyValueIterator
- */
- public RawKeyValueIterator merge() throws IOException {
- // create the MergeStreams from the sorted map created in the constructor
- // and dump the final output to a file
- int numSegments = sortedSegmentSizes.size();
- int origFactor = factor;
- int passNo = 1;
- LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
- do {
- // get the factor for this pass of merge
- factor = getPassFactor(passNo, numSegments);
- List<SegmentDescriptor> segmentsToMerge = new ArrayList<SegmentDescriptor>();
- int segmentsConsidered = 0;
- int numSegmentsToConsider = factor;
- while (true) {
- // extract the smallest 'factor' number of segment pointers from the
- // TreeMap. Call cleanup on the empty segments (no key/value data)
- SegmentDescriptor[] mStream = getSegmentDescriptors(numSegmentsToConsider);
- for (int i = 0; i < mStream.length; i++) {
- if (mStream[i].nextRawKey()) {
- segmentsToMerge.add(mStream[i]);
- segmentsConsidered++;
- // Count the fact that we read some bytes in calling nextRawKey()
- updateProgress(mStream[i].in.getPosition());
- } else {
- mStream[i].cleanup();
- numSegments--; // we ignore this segment for the merge
- }
- }
- // if we have the desired number of segments
- // or looked at all available segments, we break
- if (segmentsConsidered == factor || sortedSegmentSizes.size() == 0) {
- break;
- }
-
- numSegmentsToConsider = factor - segmentsConsidered;
- }
- // feed the streams to the priority queue
- initialize(segmentsToMerge.size());
- clear();
- for (int i = 0; i < segmentsToMerge.size(); i++) {
- put(segmentsToMerge.get(i));
- }
- // if we have lesser number of segments remaining, then just return the
- // iterator, else do another single level merge
- if (numSegments <= factor) {
- // calculate the length of the remaining segments. Required for
- // calculating the merge progress
- long totalBytes = 0;
- for (int i = 0; i < segmentsToMerge.size(); i++) {
- totalBytes += segmentsToMerge.get(i).segmentLength;
- }
- if (totalBytes != 0) // being paranoid
- progPerByte = 1.0f / (float) totalBytes;
- // reset factor to what it originally was
- factor = origFactor;
- return this;
- }
- // we want to spread the creation of temp files on multiple disks if
- // available under the space constraints
- long approxOutputSize = 0;
- for (SegmentDescriptor s : segmentsToMerge) {
- approxOutputSize += s.segmentLength + ChecksumFileSystem.getApproxChkSumLength(s.segmentLength);
- }
- Path tmpFilename = new Path(tmpDir, "intermediate").suffix("." + passNo);
-
- Path outputFile = lDirAlloc.getLocalPathForWrite(tmpFilename.toString(), approxOutputSize, conf);
- LOG.debug("writing intermediate results to " + outputFile);
- Writer writer = cloneFileAttributes(fs.makeQualified(segmentsToMerge.get(0).segmentPathName), fs.makeQualified(outputFile), null);
- writer.sync = null; // disable sync for temp files
- writeFile(this, writer);
- writer.close();
-
- // we finished one single level merge; now clean up the priority
- // queue
- this.close();
-
- SegmentDescriptor tempSegment = new SegmentDescriptor(0, fs.getFileStatus(outputFile).getLen(), outputFile);
- // put the segment back in the TreeMap
- sortedSegmentSizes.put(tempSegment, null);
- numSegments = sortedSegmentSizes.size();
- passNo++;
- // we are worried about only the first pass merge factor. So reset the
- // factor to what it originally was
- factor = origFactor;
- } while (true);
- }
-
- // Hadoop-591
- public int getPassFactor(int passNo, int numSegments) {
- if (passNo > 1 || numSegments <= factor || factor == 1)
- return factor;
- int mod = (numSegments - 1) % (factor - 1);
- if (mod == 0)
- return factor;
- return mod + 1;
- }
-
- /**
- * Return (& remove) the requested number of segment descriptors from the sorted map.
- */
- public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
- if (numDescriptors > sortedSegmentSizes.size())
- numDescriptors = sortedSegmentSizes.size();
- SegmentDescriptor[] SegmentDescriptors = new SegmentDescriptor[numDescriptors];
- Iterator iter = sortedSegmentSizes.keySet().iterator();
- int i = 0;
- while (i < numDescriptors) {
- SegmentDescriptors[i++] = (SegmentDescriptor) iter.next();
- iter.remove();
- }
- return SegmentDescriptors;
- }
- } // MySequenceFile.Sorter.MergeQueue
-
- /**
- * This class defines a merge segment. This class can be subclassed to provide a customized cleanup method implementation. In this implementation, cleanup
- * closes the file handle and deletes the file
- */
- public class SegmentDescriptor implements Comparable {
-
- long segmentOffset; // the start of the segment in the file
- long segmentLength; // the length of the segment
- Path segmentPathName; // the path name of the file containing the segment
- boolean ignoreSync = true; // set to true for temp files
- private Reader in = null;
- private DataOutputBuffer rawKey = null; // this will hold the current key
- private boolean preserveInput = false; // delete input segment files?
-
- /**
- * Constructs a segment
- *
- * @param segmentOffset
- * the offset of the segment in the file
- * @param segmentLength
- * the length of the segment
- * @param segmentPathName
- * the path name of the file containing the segment
- */
- public SegmentDescriptor(long segmentOffset, long segmentLength, Path segmentPathName) {
- this.segmentOffset = segmentOffset;
- this.segmentLength = segmentLength;
- this.segmentPathName = segmentPathName;
- }
-
- /** Do the sync checks */
- public void doSync() {
- ignoreSync = false;
- }
-
- /** Whether to delete the files when no longer needed */
- public void preserveInput(boolean preserve) {
- preserveInput = preserve;
- }
-
- public boolean shouldPreserveInput() {
- return preserveInput;
- }
-
- public int compareTo(Object o) {
- SegmentDescriptor that = (SegmentDescriptor) o;
- if (this.segmentLength != that.segmentLength) {
- return (this.segmentLength < that.segmentLength ? -1 : 1);
- }
- if (this.segmentOffset != that.segmentOffset) {
- return (this.segmentOffset < that.segmentOffset ? -1 : 1);
- }
- return (this.segmentPathName.toString()).compareTo(that.segmentPathName.toString());
- }
-
- public boolean equals(Object o) {
- if (!(o instanceof SegmentDescriptor)) {
- return false;
- }
- SegmentDescriptor that = (SegmentDescriptor) o;
- if (this.segmentLength == that.segmentLength && this.segmentOffset == that.segmentOffset
- && this.segmentPathName.toString().equals(that.segmentPathName.toString())) {
- return true;
- }
- return false;
- }
-
- public int hashCode() {
- return 37 * 17 + (int) (segmentOffset ^ (segmentOffset >>> 32));
- }
-
- /**
- * Fills up the rawKey object with the key returned by the Reader
- *
- * @return true if there is a key returned; false, otherwise
- */
- public boolean nextRawKey() throws IOException {
- if (in == null) {
- int bufferSize = conf.getInt("io.file.buffer.size", 4096);
- if (fs.getUri().getScheme().startsWith("ramfs")) {
- bufferSize = conf.getInt("io.bytes.per.checksum", 512);
- }
- Reader reader = new Reader(fs, segmentPathName, bufferSize, segmentOffset, segmentLength, conf, false);
-
- // sometimes we ignore syncs especially for temp merge files
- if (ignoreSync)
- reader.sync = null;
-
- if (reader.getKeyClass() != keyClass)
- throw new IOException("wrong key class: " + reader.getKeyClass() + " is not " + keyClass);
- if (reader.getValueClass() != valClass)
- throw new IOException("wrong value class: " + reader.getValueClass() + " is not " + valClass);
- this.in = reader;
- rawKey = new DataOutputBuffer();
- }
- rawKey.reset();
- int keyLength = in.nextRawKey(rawKey);
- return (keyLength >= 0);
- }
-
- /**
- * Fills up the passed rawValue with the value corresponding to the key read earlier
- *
- * @return the length of the value
- */
- public int nextRawValue(ValueBytes rawValue) throws IOException {
- int valLength = in.nextRawValue(rawValue);
- return valLength;
- }
-
- /** Returns the stored rawKey */
- public DataOutputBuffer getKey() {
- return rawKey;
- }
-
- /** closes the underlying reader */
- private void close() throws IOException {
- this.in.close();
- this.in = null;
- }
-
- /**
- * The default cleanup. Subclasses can override this with a custom cleanup
- */
- public void cleanup() throws IOException {
- close();
- if (!preserveInput) {
- fs.delete(segmentPathName, true);
- }
- }
- } // MySequenceFile.Sorter.SegmentDescriptor
-
- /**
- * This class provisions multiple segments contained within a single file
- */
- private class LinkedSegmentsDescriptor extends SegmentDescriptor {
-
- SegmentContainer parentContainer = null;
-
- /**
- * Constructs a segment
- *
- * @param segmentOffset
- * the offset of the segment in the file
- * @param segmentLength
- * the length of the segment
- * @param segmentPathName
- * the path name of the file containing the segment
- * @param parent
- * the parent SegmentContainer that holds the segment
- */
- public LinkedSegmentsDescriptor(long segmentOffset, long segmentLength, Path segmentPathName, SegmentContainer parent) {
- super(segmentOffset, segmentLength, segmentPathName);
- this.parentContainer = parent;
- }
-
- /**
- * The default cleanup. Subclasses can override this with a custom cleanup
- */
- public void cleanup() throws IOException {
- super.close();
- if (super.shouldPreserveInput())
- return;
- parentContainer.cleanup();
- }
- } // MySequenceFile.Sorter.LinkedSegmentsDescriptor
-
- /**
- * The class that defines a container for segments to be merged. Primarily required to delete temp files as soon as all the contained segments have been
- * looked at
- */
- private class SegmentContainer {
- private int numSegmentsCleanedUp = 0; // track the no. of segment cleanups
- private int numSegmentsContained; // # of segments contained
- private Path inName; // input file from where segments are created
-
- // the list of segments read from the file
- private ArrayList<SegmentDescriptor> segments = new ArrayList<SegmentDescriptor>();
-
- /**
- * This constructor is there primarily to serve the sort routine that generates a single output file with an associated index file
- */
- public SegmentContainer(Path inName, Path indexIn) throws IOException {
- // get the segments from indexIn
- FSDataInputStream fsIndexIn = fs.open(indexIn);
- long end = fs.getFileStatus(indexIn).getLen();
- while (fsIndexIn.getPos() < end) {
- long segmentOffset = WritableUtils.readVLong(fsIndexIn);
- long segmentLength = WritableUtils.readVLong(fsIndexIn);
- Path segmentName = inName;
- segments.add(new LinkedSegmentsDescriptor(segmentOffset, segmentLength, segmentName, this));
- }
- fsIndexIn.close();
- fs.delete(indexIn, true);
- numSegmentsContained = segments.size();
- this.inName = inName;
- }
-
- public List<SegmentDescriptor> getSegmentList() {
- return segments;
- }
-
- public void cleanup() throws IOException {
- numSegmentsCleanedUp++;
- if (numSegmentsCleanedUp == numSegmentsContained) {
- fs.delete(inName, true);
- }
- }
- } // MySequenceFile.Sorter.SegmentContainer
-
- } // MySequenceFile.Sorter
-
} // MySequenceFile
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java Fri Dec 23 17:53:12 2011
@@ -2990,6 +2990,8 @@ import org.slf4j.LoggerFactory;
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -13069,6 +13071,8 @@ import org.slf4j.LoggerFactory;
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java Fri Dec 23 17:53:12 2011
@@ -9319,6 +9319,8 @@ import org.slf4j.LoggerFactory;
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java Fri Dec 23 17:53:12 2011
@@ -9884,6 +9884,8 @@ import org.slf4j.LoggerFactory;
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
Modified: incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/data/MapFileTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/data/MapFileTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/data/MapFileTest.java (original)
+++ incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/data/MapFileTest.java Fri Dec 23 17:53:12 2011
@@ -20,24 +20,19 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Random;
import java.util.Map.Entry;
+import java.util.Random;
import junit.framework.TestCase;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.map.MyMapFile;
-import org.apache.accumulo.core.file.map.MySequenceFile;
-import org.apache.accumulo.core.file.map.MySequenceFile.CompressionType;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -55,7 +50,7 @@ public class MapFileTest extends TestCas
/*****************************
* write out the test map file
*/
- MyMapFile.Writer mfw = new MyMapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Key.class, Value.class, CompressionType.BLOCK);
+ MapFile.Writer mfw = new MapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Key.class, Value.class);
Value value = new Value(new byte[10]);
for (int i = 0; i < 10; i++) {
Text row = new Text(String.format("%08d", i));
@@ -101,7 +96,7 @@ public class MapFileTest extends TestCas
/*****************************
* write out the test map file
*/
- MyMapFile.Writer mfw = new MyMapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Text.class, BytesWritable.class, CompressionType.BLOCK);
+ MapFile.Writer mfw = new MapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Text.class, BytesWritable.class);
Text key = new Text();
BytesWritable value;
Random r = new Random();
@@ -229,73 +224,11 @@ public class MapFileTest extends TestCas
}
}
- public void testMapFileFix() {
- try {
- Configuration conf = CachedConfiguration.getInstance();
- FileSystem fs = FileSystem.get(conf);
- conf.setInt("io.seqfile.compress.blocksize", 4000);
-
- for (CompressionType compressionType : CompressionType.values()) {
- /*****************************
- * write out the test map file
- */
- MyMapFile.Writer mfw = new MyMapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Text.class, BytesWritable.class, compressionType);
- BytesWritable value;
- Random r = new Random();
- byte[] bytes = new byte[1024];
- for (int i = 0; i < 1000; i++) {
- String keyString = Integer.toString(i + 1000000);
- Text key = new Text(keyString);
- r.nextBytes(bytes);
- value = new BytesWritable(bytes);
- mfw.append(key, value);
- }
- mfw.close();
-
- /************************************
- * move the index file
- */
- fs.rename(new Path("/tmp/testMapFileIndexingMap/index"), new Path("/tmp/testMapFileIndexingMap/oldIndex"));
-
- /************************************
- * recreate the index
- */
- MyMapFile.fix(fs, new Path("/tmp/testMapFileIndexingMap"), Text.class, BytesWritable.class, false, conf);
-
- /************************************
- * compare old and new indices
- */
- MySequenceFile.Reader oldIndexReader = new MySequenceFile.Reader(fs, new Path("/tmp/testMapFileIndexingMap/oldIndex"), conf);
- MySequenceFile.Reader newIndexReader = new MySequenceFile.Reader(fs, new Path("/tmp/testMapFileIndexingMap/index"), conf);
-
- Text oldKey = new Text();
- Text newKey = new Text();
- LongWritable oldValue = new LongWritable();
- LongWritable newValue = new LongWritable();
- while (true) {
- boolean moreKeys = false;
- // check for the same number of records
- assertTrue((moreKeys = oldIndexReader.next(oldKey, oldValue)) == newIndexReader.next(newKey, newValue));
- if (!moreKeys)
- break;
- assertTrue(oldKey.compareTo(newKey) == 0);
- assertTrue(oldValue.compareTo(newValue) == 0);
- }
- oldIndexReader.close();
- newIndexReader.close();
-
- fs.delete(new Path("/tmp/testMapFileIndexingMap"), true);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
public static void main(String[] args) {
try {
Configuration conf = CachedConfiguration.getInstance();
FileSystem fs = FileSystem.get(conf);
- MyMapFile.Writer mfw = new MyMapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Text.class, BytesWritable.class, CompressionType.BLOCK);
+ MapFile.Writer mfw = new MapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Text.class, BytesWritable.class);
Text key = new Text();
BytesWritable value;
Random r = new Random();
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Fri Dec 23 17:53:12 2011
@@ -208,10 +208,10 @@ public class BulkImport extends MasterRe
}
} else {
// assume it is a map file
- extension = MyMapFile.EXTENSION;
+ extension = Constants.MAPFILE_EXTENSION;
}
- if (extension.equals(MyMapFile.EXTENSION)) {
+ if (extension.equals(Constants.MAPFILE_EXTENSION)) {
if (!fileStatus.isDir()) {
log.warn(fileStatus.getPath() + " is not a map file, ignoring");
continue;