You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/06/18 23:59:38 UTC
svn commit: r548505 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/compress/
src/java/org/apache/hadoop/io/compress/zlib/
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/io/compress/
src/test/or...
Author: cutting
Date: Mon Jun 18 14:59:36 2007
New Revision: 548505
URL: http://svn.apache.org/viewvc?view=rev&rev=548505
Log:
HADOOP-1193. Pool allocation of compression codecs. Contributed by Arun.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jun 18 14:59:36 2007
@@ -164,6 +164,11 @@
files, so that disk space, writability, etc. is considered.
(Dhruba Borthakur via cutting)
+ 52. HADOOP-1193. Pool allocation of compression codecs. This
+ eliminates a memory leak that could cause OutOfMemoryException,
+ and also substantially improves performance.
+ (Arun C Murthy via cutting)
+
Release 0.13.0 - 2007-06-08
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Mon Jun 18 14:59:36 2007
@@ -21,8 +21,6 @@
import java.io.*;
import java.util.*;
import java.net.InetAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.rmi.server.UID;
import java.security.MessageDigest;
import org.apache.commons.logging.*;
@@ -30,6 +28,8 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
@@ -293,7 +293,7 @@
Class keyClass, Class valClass, boolean compress, boolean blockCompress,
CompressionCodec codec, Metadata metadata)
throws IOException {
- if ((codec instanceof GzipCodec) &&
+ if (codec != null && (codec instanceof GzipCodec) &&
!NativeCodeLoader.isNativeCodeLoaded() &&
!ZlibFactory.isNativeZlibLoaded()) {
throw new IllegalArgumentException("SequenceFile doesn't work with " +
@@ -315,25 +315,47 @@
/**
* Construct the preferred type of 'raw' SequenceFile Writer.
- * @param out The stream on top which the writer is to be constructed.
+ * @param fs The configured filesystem.
+ * @param conf The configuration.
+ * @param file The name of the file.
* @param keyClass The 'key' type.
* @param valClass The 'value' type.
* @param compress Compress data?
* @param blockCompress Compress blocks?
+ * @param codec The compression codec.
+ * @param progress
+ * @param metadata The metadata of the file.
* @return Returns the handle to the constructed SequenceFile Writer.
* @throws IOException
*/
private static Writer
- createWriter(Configuration conf, FSDataOutputStream out,
- Class keyClass, Class valClass, boolean compress, boolean blockCompress,
- CompressionCodec codec)
- throws IOException {
- Writer writer = createWriter(conf, out, keyClass, valClass, compress,
- blockCompress, codec, new Metadata());
- return writer;
+ createWriter(FileSystem fs, Configuration conf, Path file,
+ Class keyClass, Class valClass,
+ boolean compress, boolean blockCompress,
+ CompressionCodec codec, Progressable progress, Metadata metadata)
+ throws IOException {
+ if (codec != null && (codec instanceof GzipCodec) &&
+ !NativeCodeLoader.isNativeCodeLoaded() &&
+ !ZlibFactory.isNativeZlibLoaded()) {
+ throw new IllegalArgumentException("SequenceFile doesn't work with " +
+ "GzipCodec without native-hadoop code!");
}
+ Writer writer = null;
+
+ if (!compress) {
+ writer = new Writer(fs, conf, file, keyClass, valClass, progress, metadata);
+ } else if (compress && !blockCompress) {
+ writer = new RecordCompressWriter(fs, conf, file, keyClass, valClass,
+ codec, progress, metadata);
+ } else {
+ writer = new BlockCompressWriter(fs, conf, file, keyClass, valClass,
+ codec, progress, metadata);
+ }
+ return writer;
+}
+
/**
* Construct the preferred type of 'raw' SequenceFile Writer.
* @param conf The configuration.
@@ -598,8 +620,16 @@
/** Write key/value pairs to a sequence-format file. */
public static class Writer {
+ /**
+ * A global compressor pool used to save the expensive
+ * construction/destruction of (possibly native) compression codecs.
+ */
+ private static final CodecPool<Compressor> compressorPool =
+ new CodecPool<Compressor>();
+
Configuration conf;
FSDataOutputStream out;
+ boolean ownOutputStream = true;
DataOutputBuffer buffer = new DataOutputBuffer();
Class keyClass;
@@ -610,7 +640,8 @@
CompressionOutputStream deflateFilter = null;
DataOutputStream deflateOut = null;
Metadata metadata = null;
-
+ Compressor compressor = null;
+
// Insert a globally unique 16-byte value every few entries, so that one
// can seek into the middle of a file and then synchronize with record
// starts and ends by scanning for this value.
@@ -651,6 +682,7 @@
private Writer(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass, Metadata metadata)
throws IOException {
+ this.ownOutputStream = false;
init(null, conf, out, keyClass, valClass, false, null, metadata);
initializeFileHeader();
@@ -703,7 +735,11 @@
this.metadata = metadata;
if (this.codec != null) {
ReflectionUtils.setConf(this.codec, this.conf);
- this.deflateFilter = this.codec.createOutputStream(buffer);
+ compressor = compressorPool.getCodec(this.codec.getCompressorType());
+ if (compressor == null) {
+ compressor = this.codec.createCompressor();
+ }
+ this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
this.deflateOut =
new DataOutputStream(new BufferedOutputStream(deflateFilter));
}
@@ -727,8 +763,15 @@
/** Close the file. */
public synchronized void close() throws IOException {
+ compressorPool.returnCodec(compressor);
+
if (out != null) {
- out.close();
+ out.flush();
+
+ // Close the underlying stream iff we own it...
+ if (ownOutputStream) {
+ out.close();
+ }
out = null;
}
}
@@ -849,6 +892,7 @@
private RecordCompressWriter(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
throws IOException {
+ this.ownOutputStream = false;
super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
initializeFileHeader();
@@ -967,6 +1011,7 @@
private BlockCompressWriter(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
throws IOException {
+ this.ownOutputStream = false;
super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
init(1000000);
@@ -1035,9 +1080,8 @@
public synchronized void close() throws IOException {
if (out != null) {
writeBlock();
- out.close();
- out = null;
}
+ super.close();
}
public void sync() throws IOException {
@@ -1107,6 +1151,13 @@
/** Reads key/value pairs from a sequence-format file. */
public static class Reader {
+ /**
+ * A global decompressor pool used to save the expensive
+ * construction/destruction of (possibly native) decompression codecs.
+ */
+ private static final CodecPool<Decompressor> decompressorPool =
+ new CodecPool<Decompressor>();
+
private Path file;
private FSDataInputStream in;
private DataOutputBuffer outBuf = new DataOutputBuffer();
@@ -1142,43 +1193,62 @@
private DataInputBuffer keyLenBuffer = null;
private CompressionInputStream keyLenInFilter = null;
private DataInputStream keyLenIn = null;
+ private Decompressor keyLenDecompressor = null;
private DataInputBuffer keyBuffer = null;
private CompressionInputStream keyInFilter = null;
private DataInputStream keyIn = null;
+ private Decompressor keyDecompressor = null;
private DataInputBuffer valLenBuffer = null;
private CompressionInputStream valLenInFilter = null;
private DataInputStream valLenIn = null;
+ private Decompressor valLenDecompressor = null;
private DataInputBuffer valBuffer = null;
private CompressionInputStream valInFilter = null;
private DataInputStream valIn = null;
+ private Decompressor valDecompressor = null;
/** Open the named file. */
public Reader(FileSystem fs, Path file, Configuration conf)
throws IOException {
- this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf);
+ this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
}
- private Reader(FileSystem fs, Path name, int bufferSize,
- Configuration conf) throws IOException {
- this.file = name;
- this.in = fs.open(file, bufferSize);
- this.end = fs.getLength(file);
- this.conf = conf;
- init();
+ private Reader(FileSystem fs, Path file, int bufferSize,
+ Configuration conf, boolean tempReader) throws IOException {
+ this(fs, file, bufferSize, 0, fs.getLength(file), conf, tempReader);
}
private Reader(FileSystem fs, Path file, int bufferSize, long start,
- long length, Configuration conf) throws IOException {
+ long length, Configuration conf, boolean tempReader)
+ throws IOException {
this.file = file;
this.in = fs.open(file, bufferSize);
this.conf = conf;
seek(start);
this.end = in.getPos() + length;
- init();
+ init(tempReader);
+ }
+
+ private Decompressor getPooledOrNewDecompressor() {
+ Decompressor decompressor = null;
+ decompressor = decompressorPool.getCodec(codec.getDecompressorType());
+ if (decompressor == null) {
+ decompressor = codec.createDecompressor();
+ }
+ return decompressor;
}
- private void init() throws IOException {
+
+ /**
+ * Initialize the {@link Reader}
+ * @param tmpReader <code>true</code> if we are constructing a temporary
+ * reader {@link SequenceFile.Sorter.cloneFileAttributes},
+ * and hence do not initialize every component;
+ * <code>false</code> otherwise.
+ * @throws IOException
+ */
+ private void init(boolean tempReader) throws IOException {
byte[] versionBlock = new byte[VERSION.length];
in.readFully(versionBlock);
@@ -1245,33 +1315,48 @@
in.readFully(sync); // read sync bytes
}
- // Initialize
- valBuffer = new DataInputBuffer();
- if (decompress) {
- valInFilter = this.codec.createInputStream(valBuffer);
- valIn = new DataInputStream(valInFilter);
- } else {
- valIn = valBuffer;
- }
-
- if (blockCompressed) {
- keyLenBuffer = new DataInputBuffer();
- keyBuffer = new DataInputBuffer();
- valLenBuffer = new DataInputBuffer();
-
- keyLenInFilter = this.codec.createInputStream(keyLenBuffer);
- keyLenIn = new DataInputStream(keyLenInFilter);
-
- keyInFilter = this.codec.createInputStream(keyBuffer);
- keyIn = new DataInputStream(keyInFilter);
+ // Initialize... *not* if this we are constructing a temporary Reader
+ if (!tempReader) {
+ valBuffer = new DataInputBuffer();
+ if (decompress) {
+ valDecompressor = getPooledOrNewDecompressor();
+ valInFilter = codec.createInputStream(valBuffer, valDecompressor);
+ valIn = new DataInputStream(valInFilter);
+ } else {
+ valIn = valBuffer;
+ }
- valLenInFilter = this.codec.createInputStream(valLenBuffer);
- valLenIn = new DataInputStream(valLenInFilter);
+ if (blockCompressed) {
+ keyLenBuffer = new DataInputBuffer();
+ keyBuffer = new DataInputBuffer();
+ valLenBuffer = new DataInputBuffer();
+
+ keyLenDecompressor = getPooledOrNewDecompressor();
+ keyLenInFilter = codec.createInputStream(keyLenBuffer,
+ keyLenDecompressor);
+ keyLenIn = new DataInputStream(keyLenInFilter);
+
+ keyDecompressor = getPooledOrNewDecompressor();
+ keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
+ keyIn = new DataInputStream(keyInFilter);
+
+ valLenDecompressor = getPooledOrNewDecompressor();
+ valLenInFilter = codec.createInputStream(valLenBuffer,
+ valLenDecompressor);
+ valLenIn = new DataInputStream(valLenInFilter);
+ }
}
}
/** Close the file. */
public synchronized void close() throws IOException {
+ // Return the decompressors to the pool
+ decompressorPool.returnCodec(keyLenDecompressor);
+ decompressorPool.returnCodec(keyDecompressor);
+ decompressorPool.returnCodec(valLenDecompressor);
+ decompressorPool.returnCodec(valDecompressor);
+
+ // Close the input-stream
in.close();
}
@@ -1755,6 +1840,49 @@
}
+ private static class CodecPool<T> {
+
+ private Map<Class, List<T>> pool = new HashMap<Class, List<T>>();
+
+ public T getCodec(Class codecClass) {
+ T codec = null;
+
+ // Check if an appropriate codec is available
+ synchronized (pool) {
+ if (pool.containsKey(codecClass)) {
+ List<T> codecList = pool.get(codecClass);
+
+ if (codecList != null) {
+ synchronized (codecList) {
+ if (!codecList.isEmpty()) {
+ codec = codecList.remove(0);
+ }
+ }
+ }
+ }
+ }
+
+ return codec;
+ }
+
+ public void returnCodec(T codec) {
+ if (codec != null) {
+ Class codecClass = codec.getClass();
+ synchronized (pool) {
+ if (!pool.containsKey(codecClass)) {
+ pool.put(codecClass, new ArrayList<T>());
+ }
+
+ List<T> codecList = pool.get(codecClass);
+ synchronized (codecList) {
+ codecList.add(codec);
+ }
+ }
+ }
+ }
+
+ }
+
/** Sorts key/value pairs in a sequence-format file.
*
* <p>For best performance, applications should make sure that the {@link
@@ -1951,7 +2079,7 @@
}
continue;
}
- //int length = buffer.getLength() - start;
+
int keyLength = rawKeys.getLength() - keyOffset;
if (count == keyOffsets.length)
@@ -2026,7 +2154,8 @@
long segmentStart = out.getPos();
Writer writer = createWriter(conf, out, keyClass, valClass,
- isCompressed, isBlockCompressed, codec);
+ isCompressed, isBlockCompressed, codec,
+ new Metadata());
if (!done) {
writer.sync = null; // disable sync on temp files
@@ -2036,14 +2165,12 @@
int p = pointers[i];
writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
}
- writer.sync();
- writer.out.flush();
-
+ writer.close();
if (!done) {
// Save the segment length
WritableUtils.writeVLong(indexOut, segmentStart);
- WritableUtils.writeVLong(indexOut, (writer.out.getPos()-segmentStart));
+ WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
indexOut.flush();
}
}
@@ -2179,24 +2306,6 @@
/**
* Clones the attributes (like compression of the input file and creates a
* corresponding Writer
- * @param ignoredFileSys the (ignored) FileSystem object
- * @param inputFile the path of the input file whose attributes should be
- * cloned
- * @param outputFile the path of the output file
- * @param prog the Progressable to report status during the file write
- * @return Writer
- * @throws IOException
- * @deprecated call #cloneFileAttributes(Path,Path,Progressable) instead
- */
- public Writer cloneFileAttributes(FileSystem ignoredFileSys,
- Path inputFile, Path outputFile, Progressable prog)
- throws IOException {
- return cloneFileAttributes(inputFile, outputFile, prog);
- }
-
- /**
- * Clones the attributes (like compression of the input file and creates a
- * corresponding Writer
* @param inputFile the path of the input file whose attributes should be
* cloned
* @param outputFile the path of the output file
@@ -2205,24 +2314,19 @@
* @throws IOException
*/
public Writer cloneFileAttributes(Path inputFile, Path outputFile,
- Progressable prog) throws IOException {
+ Progressable prog)
+ throws IOException {
FileSystem srcFileSys = inputFile.getFileSystem(conf);
- Reader reader = new Reader(srcFileSys, inputFile, 4096, conf);
+ Reader reader = new Reader(srcFileSys, inputFile, 4096, conf, true);
boolean compress = reader.isCompressed();
boolean blockCompress = reader.isBlockCompressed();
CompressionCodec codec = reader.getCompressionCodec();
reader.close();
-
- FileSystem dstFileSys = outputFile.getFileSystem(conf);
- FSDataOutputStream out;
- if (prog != null)
- out = dstFileSys.create(outputFile, true,
- conf.getInt("io.file.buffer.size", 4096), prog);
- else
- out = dstFileSys.create(outputFile, true,
- conf.getInt("io.file.buffer.size", 4096));
- Writer writer = createWriter(conf, out, keyClass, valClass, compress,
- blockCompress, codec);
+
+ Writer writer = createWriter(outputFile.getFileSystem(conf), conf,
+ outputFile, keyClass, valClass, compress,
+ blockCompress, codec, prog,
+ new Metadata());
return writer;
}
@@ -2457,7 +2561,7 @@
Path outputFile = lDirAlloc.getLocalPathForWrite(
tmpFilename.toString(),
approxOutputSize, conf);
- LOG.info("writing intermediate results to " + outputFile);
+ LOG.debug("writing intermediate results to " + outputFile);
Writer writer = cloneFileAttributes(
fs.makeQualified(segmentsToMerge.get(0).segmentPathName),
fs.makeQualified(outputFile), null);
@@ -2590,7 +2694,7 @@
}
Reader reader = new Reader(fs, segmentPathName,
bufferSize, segmentOffset,
- segmentLength, conf);
+ segmentLength, conf, false);
//sometimes we ignore syncs especially for temp merge files
if (ignoreSync) reader.sync = null;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java Mon Jun 18 14:59:36 2007
@@ -29,19 +29,79 @@
public interface CompressionCodec {
/**
- * Create a stream compressor that will write to the given output stream.
+ * Create a {@link CompressionOutputStream} that will write to the given
+ * {@link OutputStream}.
+ *
* @param out the location for the final output stream
- * @return a stream the user can write uncompressed data to
+ * @return a stream the user can write uncompressed data to have it compressed
+ * @throws IOException
*/
CompressionOutputStream createOutputStream(OutputStream out)
- throws IOException;
+ throws IOException;
+
+ /**
+ * Create a {@link CompressionOutputStream} that will write to the given
+ * {@link OutputStream} with the given {@link Compressor}.
+ *
+ * @param out the location for the final output stream
+ * @param compressor compressor to use
+ * @return a stream the user can write uncompressed data to have it compressed
+ * @throws IOException
+ */
+ CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor)
+ throws IOException;
+
+ /**
+ * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
+ *
+ * @return the type of compressor needed by this codec.
+ */
+ Class getCompressorType();
+
+ /**
+ * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+ *
+ * @return a new compressor for use by this codec
+ */
+ Compressor createCompressor();
/**
* Create a stream decompressor that will read from the given input stream.
+ *
* @param in the stream to read compressed bytes from
* @return a stream to read uncompressed bytes from
+ * @throws IOException
*/
CompressionInputStream createInputStream(InputStream in) throws IOException;
+
+ /**
+ * Create a {@link CompressionInputStream} that will read from the given
+ * {@link InputStream} with the given {@link Decompressor}.
+ *
+ * @param in the stream to read compressed bytes from
+ * @param decompressor decompressor to use
+ * @return a stream to read uncompressed bytes from
+ * @throws IOException
+ */
+ CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor)
+ throws IOException;
+
+
+ /**
+ * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
+ *
+ * @return the type of decompressor needed by this codec.
+ */
+ Class getDecompressorType();
+
+ /**
+ * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
+ *
+ * @return a new decompressor for use by this codec
+ */
+ Decompressor createDecompressor();
/**
* Get the default filename extension for this kind of compression.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java Mon Jun 18 14:59:36 2007
@@ -29,7 +29,7 @@
public class DefaultCodec implements Configurable, CompressionCodec {
Configuration conf;
-
+
public void setConf(Configuration conf) {
this.conf = conf;
}
@@ -38,33 +38,50 @@
return conf;
}
- /**
- * Create a stream compressor that will write to the given output stream.
- * @param out the location for the final output stream
- * @return a stream the user can write uncompressed data to
- */
public CompressionOutputStream createOutputStream(OutputStream out)
- throws IOException {
- return new CompressorStream(out, ZlibFactory.getZlibCompressor(),
+ throws IOException {
+ return new CompressorStream(out, createCompressor(),
conf.getInt("io.file.buffer.size", 4*1024));
}
-
- /**
- * Create a stream decompressor that will read from the given input stream.
- * @param in the stream to read compressed bytes from
- * @return a stream to read uncompressed bytes from
- */
+
+ public CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor)
+ throws IOException {
+ return new CompressorStream(out, compressor,
+ conf.getInt("io.file.buffer.size", 4*1024));
+ }
+
+ public Class getCompressorType() {
+ return ZlibFactory.getZlibCompressorType();
+ }
+
+ public Compressor createCompressor() {
+ return ZlibFactory.getZlibCompressor();
+ }
+
public CompressionInputStream createInputStream(InputStream in)
- throws IOException {
- return new DecompressorStream(in, ZlibFactory.getZlibDecompressor(),
+ throws IOException {
+ return new DecompressorStream(in, createDecompressor(),
+ conf.getInt("io.file.buffer.size", 4*1024));
+ }
+
+ public CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor)
+ throws IOException {
+ return new DecompressorStream(in, decompressor,
conf.getInt("io.file.buffer.size", 4*1024));
}
+
+ public Class getDecompressorType() {
+ return ZlibFactory.getZlibDecompressorType();
+ }
+
+ public Decompressor createDecompressor() {
+ return ZlibFactory.getZlibDecompressor();
+ }
- /**
- * Get the default filename extension for this kind of compression.
- * @return the extension including the '.'
- */
public String getDefaultExtension() {
return ".deflate";
}
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java Mon Jun 18 14:59:36 2007
@@ -135,58 +135,68 @@
}
}
- /**
- * Create a stream compressor that will write to the given output stream.
- * @param out the location for the final output stream
- * @return a stream the user can write uncompressed data to
- */
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
- CompressionOutputStream compOutStream = null;
-
- if (ZlibFactory.isNativeZlibLoaded()) {
- Compressor compressor =
- new ZlibCompressor(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION,
- ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
- ZlibCompressor.CompressionHeader.GZIP_FORMAT,
- 64*1024);
-
- compOutStream = new CompressorStream(out, compressor,
- conf.getInt("io.file.buffer.size", 4*1024));
- } else {
- compOutStream = new GzipOutputStream(out);
- }
-
- return compOutStream;
+ return (ZlibFactory.isNativeZlibLoaded()) ?
+ new CompressorStream(out, createCompressor(),
+ conf.getInt("io.file.buffer.size", 4*1024)) :
+ new GzipOutputStream(out);
}
- /**
- * Create a stream decompressor that will read from the given input stream.
- * @param in the stream to read compressed bytes from
- * @return a stream to read uncompressed bytes from
- */
+ public CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor)
+ throws IOException {
+ return (compressor != null) ?
+ new CompressorStream(out, compressor,
+ conf.getInt("io.file.buffer.size",
+ 4*1024)) :
+ createOutputStream(out);
+
+ }
+
+ public Compressor createCompressor() {
+ return (ZlibFactory.isNativeZlibLoaded()) ?
+ new ZlibCompressor(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION,
+ ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
+ ZlibCompressor.CompressionHeader.GZIP_FORMAT,
+ 64*1024) :
+ null;
+ }
+
+ public Class getCompressorType() {
+ return ZlibFactory.getZlibCompressorType();
+ }
+
public CompressionInputStream createInputStream(InputStream in)
- throws IOException {
- CompressionInputStream compInStream = null;
-
- if (ZlibFactory.isNativeZlibLoaded()) {
- Decompressor decompressor =
- new ZlibDecompressor(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB,
- 64*1024);
-
- compInStream = new DecompressorStream(in, decompressor,
- conf.getInt("io.file.buffer.size", 4*1024));
- } else {
- compInStream = new GzipInputStream(in);
- }
-
- return compInStream;
+ throws IOException {
+ return (ZlibFactory.isNativeZlibLoaded()) ?
+ new DecompressorStream(in, createDecompressor(),
+ conf.getInt("io.file.buffer.size",
+ 4*1024)) :
+ new GzipInputStream(in);
}
-
- /**
- * Get the default filename extension for this kind of compression.
- * @return the extension including the '.'
- */
+
+ public CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor)
+ throws IOException {
+ return (decompressor != null) ?
+ new DecompressorStream(in, decompressor,
+ conf.getInt("io.file.buffer.size",
+ 4*1024)) :
+ createInputStream(in);
+ }
+
+ public Decompressor createDecompressor() {
+ return (ZlibFactory.isNativeZlibLoaded()) ?
+ new ZlibDecompressor(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB,
+ 64*1024) :
+ null;
+ }
+
+ public Class getDecompressorType() {
+ return ZlibFactory.getZlibDecompressorType();
+ }
+
public String getDefaultExtension() {
return ".gz";
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java Mon Jun 18 14:59:36 2007
@@ -41,7 +41,7 @@
private static final Log LOG = LogFactory.getLog(LzoCodec.class.getName());
private Configuration conf;
-
+
public void setConf(Configuration conf) {
this.conf = conf;
}
@@ -81,7 +81,7 @@
throws IOException {
// Ensure native-lzo library is loaded & initialized
if (!isNativeLzoLoaded()) {
- throw new IOException("native-lzo library not available");
+ throw new RuntimeException("native-lzo library not available");
}
/**
@@ -119,12 +119,66 @@
} else {
compressionOverhead = (int)(((bufferSize - (128 + 3)) * 8.0) / 9.0);
}
-
+
return new BlockCompressorStream(out,
new LzoCompressor(strategy, bufferSize),
bufferSize, compressionOverhead);
}
+ public CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor)
+ throws IOException {
+ // Ensure native-lzo library is loaded & initialized
+ if (!isNativeLzoLoaded()) {
+ throw new RuntimeException("native-lzo library not available");
+ }
+
+ LzoCompressor.CompressionStrategy strategy =
+ LzoCompressor.CompressionStrategy.valueOf(
+ conf.get("io.compression.codec.lzo.compressor",
+ LzoCompressor.CompressionStrategy.LZO1X_1.name()
+ )
+ );
+ int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize",
+ 64*1024);
+ int compressionOverhead = 0;
+ if (strategy.name().contains("LZO1")) {
+ compressionOverhead = (int)(((bufferSize - (64 + 3)) * 16.0) / 17.0);
+ } else {
+ compressionOverhead = (int)(((bufferSize - (128 + 3)) * 8.0) / 9.0);
+ }
+
+ return new BlockCompressorStream(out, compressor, bufferSize,
+ compressionOverhead);
+ }
+
+ public Class getCompressorType() {
+ // Ensure native-lzo library is loaded & initialized
+ if (!isNativeLzoLoaded()) {
+ throw new RuntimeException("native-lzo library not available");
+ }
+
+ return LzoCompressor.class;
+ }
+
+ public Compressor createCompressor() {
+ // Ensure native-lzo library is loaded & initialized
+ if (!isNativeLzoLoaded()) {
+ throw new RuntimeException("native-lzo library not available");
+ }
+
+ LzoCompressor.CompressionStrategy strategy =
+ LzoCompressor.CompressionStrategy.valueOf(
+ conf.get("io.compression.codec.lzo.compressor",
+ LzoCompressor.CompressionStrategy.LZO1X_1.name()
+ )
+ );
+ int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize",
+ 64*1024);
+
+ return new LzoCompressor(strategy, bufferSize);
+ }
+
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
// Ensure native-lzo library is loaded & initialized
@@ -146,7 +200,47 @@
new LzoDecompressor(strategy, bufferSize),
bufferSize);
}
-
+
+ public CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor)
+ throws IOException {
+ // Ensure native-lzo library is loaded & initialized
+ if (!isNativeLzoLoaded()) {
+ throw new RuntimeException("native-lzo library not available");
+ }
+
+ return new BlockDecompressorStream(in, decompressor,
+ conf.getInt("io.compression.codec.lzo.buffersize",
+ 64*1024));
+ }
+
+ public Class getDecompressorType() {
+ // Ensure native-lzo library is loaded & initialized
+ if (!isNativeLzoLoaded()) {
+ throw new RuntimeException("native-lzo library not available");
+ }
+
+ return LzoDecompressor.class;
+ }
+
+ public Decompressor createDecompressor() {
+ // Ensure native-lzo library is loaded & initialized
+ if (!isNativeLzoLoaded()) {
+ throw new RuntimeException("native-lzo library not available");
+ }
+
+ LzoDecompressor.CompressionStrategy strategy =
+ LzoDecompressor.CompressionStrategy.valueOf(
+ conf.get("io.compression.codec.lzo.decompressor",
+ LzoDecompressor.CompressionStrategy.LZO1X.name()
+ )
+ );
+ int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize",
+ 64*1024);
+
+ return new LzoDecompressor(strategy, bufferSize);
+ }
+
/**
* Get the default filename extension for this kind of compression.
* @return the extension including the '.'
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java Mon Jun 18 14:59:36 2007
@@ -22,8 +22,6 @@
import java.nio.Buffer;
import java.nio.ByteBuffer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.util.NativeCodeLoader;
@@ -35,7 +33,7 @@
* @author Arun C Murthy
*/
public class ZlibCompressor implements Compressor {
- private static final int DEFAULT_DIRECT_BUFFER_SIZE = 1*1024;
+ private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
private long stream;
private CompressionLevel level;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java Mon Jun 18 14:59:36 2007
@@ -22,8 +22,6 @@
import java.nio.Buffer;
import java.nio.ByteBuffer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.util.NativeCodeLoader;
@@ -35,7 +33,7 @@
* @author Arun C Murthy
*/
public class ZlibDecompressor implements Decompressor {
- private static final int DEFAULT_DIRECT_BUFFER_SIZE = 1*1024;
+ private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
private long stream;
private CompressionHeader header;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java Mon Jun 18 14:59:36 2007
@@ -23,6 +23,7 @@
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.StringUtils;
/**
* A collection of factories to create the right
@@ -60,21 +61,53 @@
}
/**
+ * Return the appropriate type of the zlib compressor.
+ *
+ * @return the appropriate type of the zlib compressor.
+ */
+ public static Class getZlibCompressorType() {
+ return (nativeZlibLoaded) ?
+ ZlibCompressor.class : BuiltInZlibDeflater.class;
+ }
+
+ /**
* Return the appropriate implementation of the zlib compressor.
*
* @return the appropriate implementation of the zlib compressor.
*/
public static Compressor getZlibCompressor() {
+ LOG.info("Creating a new ZlibCompressor");
+ try {
+ throw new Exception();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
return (nativeZlibLoaded) ?
new ZlibCompressor() : new BuiltInZlibDeflater();
}
/**
+ * Return the appropriate type of the zlib decompressor.
+ *
+ * @return the appropriate type of the zlib decompressor.
+ */
+ public static Class getZlibDecompressorType() {
+ return (nativeZlibLoaded) ?
+ ZlibDecompressor.class : BuiltInZlibInflater.class;
+ }
+
+ /**
* Return the appropriate implementation of the zlib decompressor.
*
* @return the appropriate implementation of the zlib decompressor.
*/
public static Decompressor getZlibDecompressor() {
+ LOG.info("Creating a new ZlibDecompressor");
+ try {
+ throw new Exception();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
return (nativeZlibLoaded) ?
new ZlibDecompressor() : new BuiltInZlibInflater();
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Jun 18 14:59:36 2007
@@ -42,6 +42,7 @@
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Sorter;
+import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -332,10 +333,10 @@
job.getMapOutputValueClass(), compressionType, codec);
}
private void endPartition(int partNumber) throws IOException {
- //Need to write syncs especially if block compression is in use
+ //Need to close the file, especially if block compression is in use
//We also update the index file to contain the part offsets per
//spilled file
- writer.sync();
+ writer.close();
indexOut.writeLong(segmentStart);
//we also store 0 length key/val segments to make the merge phase easier.
indexOut.writeLong(out.getPos()-segmentStart);
@@ -529,11 +530,13 @@
//create dummy files
for (int i = 0; i < partitions; i++) {
segmentStart = finalOut.getPos();
- SequenceFile.createWriter(job, finalOut,
- job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
- compressionType, codec);
+ Writer writer = SequenceFile.createWriter(job, finalOut,
+ job.getMapOutputKeyClass(),
+ job.getMapOutputValueClass(),
+ compressionType, codec);
finalIndexOut.writeLong(segmentStart);
finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
+ writer.close();
}
finalOut.close();
finalIndexOut.close();
@@ -560,14 +563,14 @@
segmentList.add(i, s);
}
segmentStart = finalOut.getPos();
+ RawKeyValueIterator kvIter = sorter.merge(segmentList, new Path(getTaskId()));
SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut,
job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
compressionType, codec);
- sorter.writeFile(sorter.merge(segmentList, new Path(getTaskId())),
- writer);
- //add a sync block - required esp. for block compression to ensure
+ sorter.writeFile(kvIter, writer);
+ //close the file - required esp. for block compression to ensure
//partition data don't span partition boundaries
- writer.sync();
+ writer.close();
//when we write the offset/length to the final index file, we write
//longs for both. This helps us to reliably seek directly to the
//offset/length for a partition when we start serving the byte-ranges
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java Mon Jun 18 14:59:36 2007
@@ -18,6 +18,7 @@
package org.apache.hadoop.io.compress;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
@@ -39,14 +40,44 @@
return conf;
}
- public CompressionOutputStream createOutputStream(OutputStream out) {
+ public CompressionOutputStream createOutputStream(OutputStream out)
+ throws IOException {
return null;
}
- public CompressionInputStream createInputStream(InputStream in) {
+ public Class getCompressorType() {
return null;
}
-
+
+ public Compressor createCompressor() {
+ return null;
+ }
+
+ public CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor)
+ throws IOException {
+ return null;
+ }
+
+ public CompressionInputStream createInputStream(InputStream in)
+ throws IOException {
+ return null;
+ }
+
+ public CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor)
+ throws IOException {
+ return null;
+ }
+
+ public Class getDecompressorType() {
+ return null;
+ }
+
+ public Decompressor createDecompressor() {
+ return null;
+ }
+
public String getDefaultExtension() {
return ".base";
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java Mon Jun 18 14:59:36 2007
@@ -1,42 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.mapred;
import java.io.*;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.util.ToolBase;
import org.apache.hadoop.fs.*;
-public class BigMapOutput {
+public class BigMapOutput extends ToolBase {
+ public static final Log LOG =
+ LogFactory.getLog(BigMapOutput.class.getName());
+ private static Random random = new Random();
+
+ private static void randomizeBytes(byte[] data, int offset, int length) {
+ for(int i=offset + length - 1; i >= offset; --i) {
+ data[i] = (byte) random.nextInt(256);
+ }
+ }
+
+ private static void createBigMapInputFile(Configuration conf, FileSystem fs,
+ Path dir, long fileSizeInMB)
+ throws IOException {
+ // Check if the input path exists and is non-empty
+ if (fs.exists(dir)) {
+ Path[] list = fs.listPaths(dir);
+ if (list != null && list.length > 0) {
+ throw new IOException("Input path: " + dir + " already exists... ");
+ }
+ }
+
+ Path file = new Path(dir, "part-0");
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, conf, file,
+ BytesWritable.class, BytesWritable.class,
+ CompressionType.NONE);
+ long numBytesToWrite = fileSizeInMB * 1024 * 1024;
+ int minKeySize = conf.getInt("test.bmo.min_key", 10);;
+ int keySizeRange =
+ conf.getInt("test.bmo.max_key", 1000) - minKeySize;
+ int minValueSize = conf.getInt("test.bmo.min_value", 0);
+ int valueSizeRange =
+ conf.getInt("test.bmo.max_value", 20000) - minValueSize;
+ BytesWritable randomKey = new BytesWritable();
+ BytesWritable randomValue = new BytesWritable();
+
+ LOG.info("Writing " + numBytesToWrite + " bytes to " + file + " with " +
+ "minKeySize: " + minKeySize + " keySizeRange: " + keySizeRange +
+ " minValueSize: " + minValueSize + " valueSizeRange: " + valueSizeRange);
+ long start = System.currentTimeMillis();
+ while (numBytesToWrite > 0) {
+ int keyLength = minKeySize +
+ (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
+ randomKey.setSize(keyLength);
+ randomizeBytes(randomKey.get(), 0, randomKey.getSize());
+ int valueLength = minValueSize +
+ (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
+ randomValue.setSize(valueLength);
+ randomizeBytes(randomValue.get(), 0, randomValue.getSize());
+ writer.append(randomKey, randomValue);
+ numBytesToWrite -= keyLength + valueLength;
+ }
+ writer.close();
+ long end = System.currentTimeMillis();
- public static void main(String[] args) throws IOException {
- if (args.length != 4) { //input-dir should contain a huge file ( > 2GB)
- System.err.println("BigMapOutput " +
- "-input <input-dir> -output <output-dir>");
- System.exit(1);
+ LOG.info("Created " + file + " of size: " + fileSizeInMB + "MB in " +
+ (end-start)/1000 + "secs");
+ }
+
+ private static void usage() {
+ System.err.println("BigMapOutput -input <input-dir> -output <output-dir> " +
+ "[-create <filesize in MB>]");
+ System.exit(1);
+ }
+ public int run(String[] args) throws Exception {
+ if (args.length < 4) { //input-dir should contain a huge file ( > 2GB)
+ usage();
}
Path bigMapInput = null;
Path outputPath = null;
+ boolean createInput = false;
+ long fileSizeInMB = 3 * 1024; // default of 3GB (>2GB)
for(int i=0; i < args.length; ++i) {
if ("-input".equals(args[i])){
bigMapInput = new Path(args[++i]);
} else if ("-output".equals(args[i])){
outputPath = new Path(args[++i]);
+ } else if ("-create".equals(args[i])) {
+ createInput = true;
+ fileSizeInMB = Long.parseLong(args[++i]);
+ } else {
+ usage();
}
}
- Configuration defaults = new Configuration();
- FileSystem fs = FileSystem.get(defaults);
- JobConf jobConf = new JobConf(defaults, BigMapOutput.class);
+ FileSystem fs = FileSystem.get(conf);
+ JobConf jobConf = new JobConf(conf, BigMapOutput.class);
jobConf.setJobName("BigMapOutput");
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
@@ -50,12 +133,23 @@
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);
-
+
+ if (createInput) {
+ createBigMapInputFile(jobConf, fs, bigMapInput, fileSizeInMB);
+ }
+
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(jobConf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
-
+
+ return 0;
}
+
+ public static void main(String argv[]) throws Exception {
+ int res = new BigMapOutput().doMain(new Configuration(), argv);
+ System.exit(res);
+ }
+
}