You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/07 22:30:22 UTC
svn commit: r441210 - in /lucene/hadoop/trunk: ./
src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/io/
src/java/org/apache/hadoop/io/compress/ src/java/org/apache/hadoop/mapred/
src/test/org/apache/hadoop/io/compress/
Author: cutting
Date: Thu Sep 7 13:30:21 2006
New Revision: 441210
URL: http://svn.apache.org/viewvc?view=rev&rev=441210
Log:
HADOOP-441. Add a compression codec API and extend SequenceFile to use it.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=441210&r1=441209&r2=441210
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Sep 7 13:30:21 2006
@@ -148,6 +148,10 @@
advertised by datanodes and tasktrackers.
(Lorenzo Thione via cutting)
+37. HADOOP-441. Add a compression codec API and extend SequenceFile
+ to use it. This will permit the use of alternate compression
+ codecs in SequenceFile. (Arun C Murthy via cutting)
+
Release 0.5.0 - 2006-08-04
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java?view=diff&rev=441210&r1=441209&r2=441210
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java Thu Sep 7 13:30:21 2006
@@ -41,6 +41,7 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.Progressable;
/**
* This program uses map/reduce to just run a distributed job where there is
@@ -189,7 +190,7 @@
jobConf, file,
IntWritable.class, IntWritable.class,
CompressionType.NONE,
- null);
+ (Progressable)null);
writer.append(new IntWritable(0), new IntWritable(filesPerMap));
writer.close();
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=441210&r1=441209&r2=441210
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Thu Sep 7 13:30:21 2006
@@ -18,15 +18,19 @@
import java.io.*;
import java.util.*;
-import java.util.zip.*;
import java.net.InetAddress;
import java.rmi.server.UID;
import java.security.MessageDigest;
import org.apache.lucene.util.PriorityQueue;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
/** Support for flat files of binary key/value pairs. */
public class SequenceFile {
@@ -36,8 +40,9 @@
private SequenceFile() {} // no public ctor
private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
+ private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
private static byte[] VERSION = new byte[] {
- (byte)'S', (byte)'E', (byte)'Q', BLOCK_COMPRESS_VERSION
+ (byte)'S', (byte)'E', (byte)'Q', CUSTOM_COMPRESS_VERSION
};
private static final int SYNC_ESCAPE = -1; // "length" of sync entries
@@ -79,9 +84,11 @@
if (compressionType == CompressionType.NONE) {
writer = new Writer(fs, conf, name, keyClass, valClass);
} else if (compressionType == CompressionType.RECORD) {
- writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass);
+ writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass,
+ new DefaultCodec());
} else if (compressionType == CompressionType.BLOCK){
- writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass);
+ writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass,
+ new DefaultCodec());
}
return writer;
@@ -106,13 +113,78 @@
Writer writer = null;
if (compressionType == CompressionType.NONE) {
+ writer = new Writer(fs, conf, name, keyClass, valClass, progress);
+ } else if (compressionType == CompressionType.RECORD) {
+ writer = new RecordCompressWriter(fs, conf, name,
+ keyClass, valClass, new DefaultCodec(), progress);
+ } else if (compressionType == CompressionType.BLOCK){
+ writer = new BlockCompressWriter(fs, conf, name,
+ keyClass, valClass, new DefaultCodec(), progress);
+ }
+
+ return writer;
+ }
+
+ /**
+ * Construct the preferred type of SequenceFile Writer.
+ * @param fs The configured filesystem.
+ * @param conf The configuration.
+ * @param name The name of the file.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compressionType The compression type.
+ * @param codec The compression codec.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass,
+ CompressionType compressionType, CompressionCodec codec)
+ throws IOException {
+ Writer writer = null;
+
+ if (compressionType == CompressionType.NONE) {
+ writer = new Writer(fs, conf, name, keyClass, valClass);
+ } else if (compressionType == CompressionType.RECORD) {
+ writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass,
+ codec);
+ } else if (compressionType == CompressionType.BLOCK){
+ writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass,
+ codec);
+ }
+
+ return writer;
+ }
+
+ /**
+ * Construct the preferred type of SequenceFile Writer.
+ * @param fs The configured filesystem.
+ * @param conf The configuration.
+ * @param name The name of the file.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compressionType The compression type.
+ * @param codec The compression codec.
+ * @param progress The Progressable object to track progress.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass,
+ CompressionType compressionType, CompressionCodec codec,
+ Progressable progress) throws IOException {
+ Writer writer = null;
+
+ if (compressionType == CompressionType.NONE) {
writer = new Writer(fs, conf, name, keyClass, valClass, progress);
} else if (compressionType == CompressionType.RECORD) {
writer = new RecordCompressWriter(fs, conf, name,
- keyClass, valClass, progress);
+ keyClass, valClass, codec, progress);
} else if (compressionType == CompressionType.BLOCK){
writer = new BlockCompressWriter(fs, conf, name,
- keyClass, valClass, progress);
+ keyClass, valClass, codec, progress);
}
return writer;
@@ -130,16 +202,17 @@
*/
private static Writer
createWriter(FSDataOutputStream out,
- Class keyClass, Class valClass, boolean compress, boolean blockCompress)
+ Class keyClass, Class valClass, boolean compress, boolean blockCompress,
+ CompressionCodec codec)
throws IOException {
Writer writer = null;
if (!compress) {
writer = new Writer(out, keyClass, valClass);
} else if (compress && !blockCompress) {
- writer = new RecordCompressWriter(out, keyClass, valClass);
+ writer = new RecordCompressWriter(out, keyClass, valClass, codec);
} else {
- writer = new BlockCompressWriter(out, keyClass, valClass);
+ writer = new BlockCompressWriter(out, keyClass, valClass, codec);
}
return writer;
@@ -200,11 +273,14 @@
private static class CompressedBytes implements ValueBytes {
private int dataSize;
private byte[] data;
- private Inflater zlibInflater = null;
+ DataInputBuffer rawData = null;
+ CompressionCodec codec = null;
+ CompressionInputStream decompressedStream = null;
- private CompressedBytes() {
+ private CompressedBytes(CompressionCodec codec) {
data = null;
dataSize = 0;
+ this.codec = codec;
}
private void reset(DataInputStream in, int length) throws IOException {
@@ -221,21 +297,18 @@
public void writeUncompressedBytes(DataOutputStream outStream)
throws IOException {
- if (zlibInflater == null) {
- zlibInflater = new Inflater();
+ if (decompressedStream == null) {
+ rawData = new DataInputBuffer();
+ decompressedStream = codec.createInputStream(rawData);
} else {
- zlibInflater.reset();
+ decompressedStream.resetState();
}
- zlibInflater.setInput(data, 0, dataSize);
+ rawData.reset(data, 0, dataSize);
byte[] buffer = new byte[8192];
- while (!zlibInflater.finished()) {
- try {
- int noDecompressedBytes = zlibInflater.inflate(buffer);
- outStream.write(buffer, 0, noDecompressedBytes);
- } catch (DataFormatException e) {
- throw new IOException (e.toString());
- }
+ int bytesRead = 0;
+ while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
+ outStream.write(buffer, 0, bytesRead);
}
}
@@ -256,11 +329,9 @@
Class valClass;
private boolean compress;
- Deflater deflater = new Deflater(Deflater.BEST_SPEED);
- DeflaterOutputStream deflateFilter =
- new DeflaterOutputStream(buffer, deflater);
- DataOutputStream deflateOut =
- new DataOutputStream(new BufferedOutputStream(deflateFilter));
+ CompressionCodec codec = null;
+ CompressionOutputStream deflateFilter = null;
+ DataOutputStream deflateOut = null;
// Insert a globally unique 16-byte value every few entries, so that one
// can seek into the middle of a file and then synchronize with record
@@ -309,7 +380,7 @@
public Writer(FileSystem fs, Path name,
Class keyClass, Class valClass, boolean compress)
throws IOException {
- init(name, fs.create(name), keyClass, valClass, compress);
+ init(name, fs.create(name), keyClass, valClass, compress, null);
initializeFileHeader();
writeFileHeader();
@@ -324,7 +395,8 @@
Class keyClass, Class valClass, boolean compress,
Progressable progress)
throws IOException {
- init(name, fs.create(name, progress), keyClass, valClass, compress);
+ init(name, fs.create(name, progress), keyClass, valClass,
+ compress, null);
initializeFileHeader();
writeFileHeader();
@@ -347,8 +419,8 @@
/** Write to an arbitrary stream using a specified buffer size. */
private Writer(FSDataOutputStream out, Class keyClass, Class valClass)
- throws IOException {
- init(null, out, keyClass, valClass, false);
+ throws IOException {
+ init(null, out, keyClass, valClass, false, null);
initializeFileHeader();
writeFileHeader();
@@ -379,18 +451,28 @@
out.writeBoolean(this.isCompressed());
out.writeBoolean(this.isBlockCompressed());
+
+ if(this.isCompressed()) {
+ Text.writeString(out, (codec.getClass()).getName());
+ }
}
/** Initialize. */
void init(Path name, FSDataOutputStream out,
Class keyClass, Class valClass,
- boolean compress)
+ boolean compress, CompressionCodec codec)
throws IOException {
this.target = name;
this.out = out;
this.keyClass = keyClass;
this.valClass = valClass;
this.compress = compress;
+ this.codec = codec;
+ if(this.codec != null) {
+ this.deflateFilter = this.codec.createOutputStream(buffer);
+ this.deflateOut =
+ new DataOutputStream(new BufferedOutputStream(deflateFilter));
+ }
}
/** Returns the class of keys in this file. */
@@ -399,6 +481,9 @@
/** Returns the class of values in this file. */
public Class getValueClass() { return valClass; }
+ /** Returns the compression codec of data in this file. */
+ public CompressionCodec getCompressionCodec() { return codec; }
+
/** Close the file. */
public synchronized void close() throws IOException {
if (out != null) {
@@ -435,7 +520,7 @@
// Append the 'value'
if (compress) {
- deflater.reset();
+ deflateFilter.resetState();
val.write(deflateOut);
deflateOut.flush();
deflateFilter.finish();
@@ -495,8 +580,9 @@
/** Create the named file. */
public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass) throws IOException {
- super.init(name, fs.create(name), keyClass, valClass, true);
+ Class keyClass, Class valClass, CompressionCodec codec)
+ throws IOException {
+ super.init(name, fs.create(name), keyClass, valClass, true, codec);
initializeFileHeader();
writeFileHeader();
@@ -505,9 +591,11 @@
/** Create the named file with write-progress reporter. */
public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass, Progressable progress)
+ Class keyClass, Class valClass, CompressionCodec codec,
+ Progressable progress)
throws IOException {
- super.init(name, fs.create(name, progress), keyClass, valClass, true);
+ super.init(name, fs.create(name, progress),
+ keyClass, valClass, true, codec);
initializeFileHeader();
writeFileHeader();
@@ -516,9 +604,9 @@
/** Write to an arbitrary stream using a specified buffer size. */
private RecordCompressWriter(FSDataOutputStream out,
- Class keyClass, Class valClass)
+ Class keyClass, Class valClass, CompressionCodec codec)
throws IOException {
- super.init(null, out, keyClass, valClass, true);
+ super.init(null, out, keyClass, valClass, true, codec);
initializeFileHeader();
writeFileHeader();
@@ -546,7 +634,7 @@
throw new IOException("zero length keys not allowed: " + key);
// Compress 'value' and append it
- deflater.reset();
+ deflateFilter.resetState();
val.write(deflateOut);
deflateOut.flush();
deflateFilter.finish();
@@ -590,19 +678,13 @@
private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
private DataOutputBuffer valBuffer = new DataOutputBuffer();
- private DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
- private Deflater deflater = new Deflater(Deflater.BEST_SPEED);
- private DeflaterOutputStream deflateFilter =
- new DeflaterOutputStream(compressedDataBuffer, deflater);
- private DataOutputStream deflateOut =
- new DataOutputStream(new BufferedOutputStream(deflateFilter));
-
private int compressionBlockSize;
/** Create the named file. */
public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass) throws IOException {
- super.init(name, fs.create(name), keyClass, valClass, true);
+ Class keyClass, Class valClass, CompressionCodec codec)
+ throws IOException {
+ super.init(name, fs.create(name), keyClass, valClass, true, codec);
init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000));
initializeFileHeader();
@@ -612,9 +694,11 @@
/** Create the named file with write-progress reporter. */
public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass, Progressable progress)
+ Class keyClass, Class valClass, CompressionCodec codec,
+ Progressable progress)
throws IOException {
- super.init(name, fs.create(name, progress), keyClass, valClass, true);
+ super.init(name, fs.create(name, progress), keyClass, valClass,
+ true, codec);
init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000));
initializeFileHeader();
@@ -624,9 +708,9 @@
/** Write to an arbitrary stream using a specified buffer size. */
private BlockCompressWriter(FSDataOutputStream out,
- Class keyClass, Class valClass)
+ Class keyClass, Class valClass, CompressionCodec codec)
throws IOException {
- super.init(null, out, keyClass, valClass, true);
+ super.init(null, out, keyClass, valClass, true, codec);
init(1000000);
initializeFileHeader();
@@ -644,17 +728,17 @@
/** Workhorse to check and write out compressed data/lengths */
private synchronized
- void writeBuffer(DataOutputBuffer buffer)
+ void writeBuffer(DataOutputBuffer uncompressedDataBuffer)
throws IOException {
- deflater.reset();
- compressedDataBuffer.reset();
- deflateOut.write(buffer.getData(), 0, buffer.getLength());
+ deflateFilter.resetState();
+ buffer.reset();
+ deflateOut.write(uncompressedDataBuffer.getData(), 0,
+ uncompressedDataBuffer.getLength());
deflateOut.flush();
deflateFilter.finish();
- WritableUtils.writeVInt(out, compressedDataBuffer.getLength());
- out.write(compressedDataBuffer.getData(),
- 0, compressedDataBuffer.getLength());
+ WritableUtils.writeVInt(out, buffer.getLength());
+ out.write(buffer.getData(), 0, buffer.getLength());
}
/** Compress and flush contents to dfs */
@@ -771,6 +855,8 @@
private Class keyClass;
private Class valClass;
+ private CompressionCodec codec = null;
+
private byte[] sync = new byte[SYNC_HASH_SIZE];
private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
private boolean syncSeen;
@@ -791,17 +877,17 @@
private int noBufferedValues = 0;
private DataInputBuffer keyLenBuffer = null;
- private Inflater keyLenInflater = null;
+ private CompressionInputStream keyLenInFilter = null;
private DataInputStream keyLenIn = null;
private DataInputBuffer keyBuffer = null;
- private Inflater keyInflater = null;
+ private CompressionInputStream keyInFilter = null;
private DataInputStream keyIn = null;
private DataInputBuffer valLenBuffer = null;
- private Inflater valLenInflater = null;
+ private CompressionInputStream valLenInFilter = null;
private DataInputStream valLenIn = null;
private DataInputBuffer valBuffer = null;
- private Inflater valInflater = null;
+ private CompressionInputStream valInFilter = null;
private DataInputStream valIn = null;
/** @deprecated Call {@link #SequenceFile.Reader(FileSystem,Path,Configuration)}.*/
@@ -870,6 +956,19 @@
this.blockCompressed = in.readBoolean(); // is block-compressed?
}
+ // if version >= 5
+ // setup the compression codec
+ if (version >= CUSTOM_COMPRESS_VERSION && this.decompress) {
+ try {
+ this.codec = (CompressionCodec)
+ ReflectionUtils.newInstance(conf.getClassByName(Text.readString(in)),
+ conf);
+ } catch (ClassNotFoundException cnfe) {
+ cnfe.printStackTrace();
+ throw new IllegalArgumentException("Unknown codec: " + cnfe);
+ }
+ }
+
if (version > 1) { // if version > 1
in.readFully(sync); // read sync bytes
}
@@ -877,10 +976,8 @@
// Initialize
valBuffer = new DataInputBuffer();
if (decompress) {
- valInflater = new Inflater();
- valIn = new DataInputStream(new BufferedInputStream(
- new InflaterInputStream(valBuffer, valInflater))
- );
+ valInFilter = this.codec.createInputStream(valBuffer);
+ valIn = new DataInputStream(new BufferedInputStream(valInFilter));
} else {
valIn = new DataInputStream(new BufferedInputStream(valBuffer));
}
@@ -890,19 +987,14 @@
keyBuffer = new DataInputBuffer();
valLenBuffer = new DataInputBuffer();
- keyLenInflater = new Inflater();
- keyLenIn = new DataInputStream(new BufferedInputStream(
- new InflaterInputStream(keyLenBuffer, keyLenInflater))
- );
-
- keyInflater = new Inflater();
- keyIn = new DataInputStream(new BufferedInputStream(
- new InflaterInputStream(keyBuffer, keyInflater)));
-
- valLenInflater = new Inflater();
- valLenIn = new DataInputStream(new BufferedInputStream(
- new InflaterInputStream(valLenBuffer, valLenInflater))
- );
+ keyLenInFilter = this.codec.createInputStream(keyLenBuffer);
+ keyLenIn = new DataInputStream(new BufferedInputStream(keyLenInFilter));
+
+ keyInFilter = this.codec.createInputStream(keyBuffer);
+ keyIn = new DataInputStream(new BufferedInputStream(keyInFilter));
+
+ valLenInFilter = this.codec.createInputStream(valLenBuffer);
+ valLenIn = new DataInputStream(new BufferedInputStream(valLenInFilter));
}
@@ -926,18 +1018,20 @@
/** Returns true if records are block-compressed. */
public boolean isBlockCompressed() { return blockCompressed; }
+ /** Returns the compression codec of data in this file. */
+ public CompressionCodec getCompressionCodec() { return codec; }
+
/** Read a compressed buffer */
- private synchronized void readBuffer(
- DataInputBuffer buffer, Inflater inflater, boolean castAway
- ) throws IOException {
+ private synchronized void readBuffer(DataInputBuffer buffer,
+ CompressionInputStream filter, boolean castAway) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
if (false == castAway) {
- // Reset the inflater
- inflater.reset();
+ // Reset the codec
+ filter.resetState();
// Set up 'buffer' connected to the input-stream
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
@@ -970,14 +1064,14 @@
noBufferedRecords = WritableUtils.readVInt(in);
// Read key lengths and keys
- readBuffer(keyLenBuffer, keyLenInflater, false);
- readBuffer(keyBuffer, keyInflater, false);
+ readBuffer(keyLenBuffer, keyLenInFilter, false);
+ readBuffer(keyBuffer, keyInFilter, false);
noBufferedKeys = noBufferedRecords;
// Read value lengths and values
if (!lazyDecompress) {
- readBuffer(valLenBuffer, valLenInflater, false);
- readBuffer(valBuffer, valInflater, false);
+ readBuffer(valLenBuffer, valLenInFilter, false);
+ readBuffer(valBuffer, valInFilter, false);
noBufferedValues = noBufferedRecords;
valuesDecompressed = true;
}
@@ -990,14 +1084,14 @@
private synchronized void seekToCurrentValue() throws IOException {
if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
if (decompress) {
- valInflater.reset();
+ valInFilter.resetState();
}
} else {
// Check if this is the first value in the 'block' to be read
if (lazyDecompress && !valuesDecompressed) {
// Read the value lengths and values
- readBuffer(valLenBuffer, valLenInflater, false);
- readBuffer(valBuffer, valInflater, false);
+ readBuffer(valLenBuffer, valLenInFilter, false);
+ readBuffer(valBuffer, valInFilter, false);
noBufferedValues = noBufferedRecords;
valuesDecompressed = true;
}
@@ -1168,7 +1262,7 @@
if (!decompress || blockCompressed) {
val = new UncompressedBytes();
} else {
- val = new CompressedBytes();
+ val = new CompressedBytes(codec);
}
return val;
}
@@ -1418,6 +1512,7 @@
boolean atEof = (currentFile >= inFiles.length);
boolean isCompressed = false;
boolean isBlockCompressed = false;
+ CompressionCodec codec = null;
segmentLengths.clear();
if (atEof) {
return 0;
@@ -1427,6 +1522,8 @@
in = new Reader(fs, inFiles[currentFile], conf);
isCompressed = in.isCompressed();
isBlockCompressed = in.isBlockCompressed();
+ codec = in.getCompressionCodec();
+
for (int i=0; i < rawValues.length; ++i) {
rawValues[i] = null;
}
@@ -1479,7 +1576,7 @@
LOG.debug("flushing segment " + segments);
rawBuffer = rawKeys.getData();
sort(count);
- flush(count, bytesProcessed, isCompressed, isBlockCompressed,
+ flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec,
segments==0 && atEof);
segments++;
}
@@ -1523,7 +1620,8 @@
}
private void flush(int count, int bytesProcessed, boolean isCompressed,
- boolean isBlockCompressed, boolean done) throws IOException {
+ boolean isBlockCompressed, CompressionCodec codec, boolean done)
+ throws IOException {
if (out == null) {
outName = done ? outFile : outFile.suffix(".0");
out = fs.create(outName);
@@ -1534,7 +1632,7 @@
long segmentStart = out.getPos();
Writer writer = createWriter(out, keyClass, valClass,
- isCompressed, isBlockCompressed);
+ isCompressed, isBlockCompressed, codec);
if (!done) {
writer.sync = null; // disable sync on temp files
@@ -1757,11 +1855,13 @@
private boolean done;
private boolean compress;
private boolean blockCompress;
+ private CompressionCodec codec = null;
public void put(MergeStream stream) throws IOException {
if (size() == 0) {
compress = stream.in.isCompressed();
blockCompress = stream.in.isBlockCompressed();
+ codec = stream.in.getCompressionCodec();
} else if (compress != stream.in.isCompressed() ||
blockCompress != stream.in.isBlockCompressed()) {
throw new IOException("All merged files must be compressed or not.");
@@ -1790,7 +1890,7 @@
public void merge() throws IOException {
Writer writer = createWriter(out, keyClass, valClass,
- compress, blockCompress);
+ compress, blockCompress, codec);
if (!done) {
writer.sync = null; // disable sync on temp files
}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java?view=auto&rev=441210
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java Thu Sep 7 13:30:21 2006
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * This class encapsulates a streaming compression/decompression pair.
+ * @author Owen O'Malley
+ */
+public interface CompressionCodec {
+
+ /**
+ * Create a stream compressor that will write to the given output stream.
+ * @param out the location for the final output stream
+ * @return a stream the user can write uncompressed data to
+ */
+ CompressionOutputStream createOutputStream(OutputStream out)
+ throws IOException;
+
+ /**
+ * Create a stream decompressor that will read from the given input stream.
+ * @param in the stream to read compressed bytes from
+ * @return a stream to read uncompressed bytes from
+ */
+ CompressionInputStream createInputStream(InputStream in) throws IOException;
+
+ /**
+ * Get the default filename extension for this kind of compression.
+ * @return the extension including the '.'
+ */
+ String getDefaultExtension();
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java?view=auto&rev=441210
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java Thu Sep 7 13:30:21 2006
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+* A compression input stream.
+ * @author Arun C Murthy
+ */
+public abstract class CompressionInputStream extends InputStream {
+ /**
+ * The input stream to be compressed.
+ */
+ protected final InputStream in;
+
+ /**
+ * Create a compression input stream that reads
+ * the decompressed bytes from the given stream.
+ * @param out
+ */
+ protected CompressionInputStream(InputStream in) {
+ this.in = in;
+ }
+
+ public void close() throws IOException {
+ in.close();
+ }
+
+ /**
+ * Read bytes from the stream.
+ * Made abstract to prevent leakage to underlying stream.
+ */
+ public abstract int read(byte[] b, int off, int len) throws IOException;
+
+ /**
+ * Reset the compression to the initial state. Does not reset the underlying
+ * stream.
+ */
+ public abstract void resetState() throws IOException;
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionOutputStream.java?view=auto&rev=441210
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionOutputStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionOutputStream.java Thu Sep 7 13:30:21 2006
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A compression output stream.
+ * @author Arun C Murthy
+ */
+public abstract class CompressionOutputStream extends OutputStream {
+ /**
+ * The output stream to be compressed.
+ */
+ protected final OutputStream out;
+
+ /**
+ * Create a compression output stream that writes
+ * the compressed bytes to the given stream.
+ * @param out
+ */
+ protected CompressionOutputStream(OutputStream out) {
+ this.out = out;
+ }
+
+ public void close() throws IOException {
+ finish();
+ out.close();
+ }
+
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ /**
+ * Write compressed bytes to the stream.
+ * Made abstract to prevent leakage to underlying stream.
+ */
+ public abstract void write(byte[] b, int off, int len) throws IOException;
+
+ /**
+ * Finishes writing compressed data to the output stream
+ * without closing the underlying stream.
+ */
+ public abstract void finish() throws IOException;
+
+ /**
+ * Reset the compression to the initial state.
+ * Does not reset the underlying stream.
+ */
+ public abstract void resetState() throws IOException;
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java?view=auto&rev=441210
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java Thu Sep 7 13:30:21 2006
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.*;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+public class DefaultCodec implements CompressionCodec {
+
+ /**
+ * A bridge that wraps around a DeflaterOutputStream to make it
+ * a CompressionOutputStream.
+ * @author Owen O'Malley
+ */
+ protected static class DefaultCompressionOutputStream
+ extends CompressionOutputStream {
+
+ /**
+ * A DeflaterOutputStream that provides a mechanism to
+ * reset the decompressor.
+ * @author Owen O'Malley
+ */
+ private static class ResetableDeflaterOutputStream
+ extends DeflaterOutputStream {
+
+ public ResetableDeflaterOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ public void resetState() throws IOException {
+ def.reset();
+ }
+ }
+
+ public DefaultCompressionOutputStream(OutputStream out) {
+ super(new ResetableDeflaterOutputStream(out));
+ }
+
+ /**
+ * Allow children types to put a different type in here (namely gzip).
+ * @param out the Deflater stream to use
+ */
+ protected DefaultCompressionOutputStream(DeflaterOutputStream out) {
+ super(out);
+ }
+
+ public void close() throws IOException {
+ out.close();
+ }
+
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ public void write(byte[] data, int offset, int length)
+ throws IOException {
+ out.write(data, offset, length);
+ }
+
+ public void finish() throws IOException {
+ ((DeflaterOutputStream) out).finish();
+ }
+
+ public void resetState() throws IOException {
+ ((ResetableDeflaterOutputStream) out).resetState();
+ }
+ }
+
+ protected static class DefaultCompressionInputStream
+ extends CompressionInputStream {
+
+ /**
+ * A InflaterStream that provides a mechanism to reset the decompressor.
+ * @author Owen O'Malley
+ */
+ private static class ResetableInflaterInputStream
+ extends InflaterInputStream {
+ public ResetableInflaterInputStream(InputStream in) {
+ super(in);
+ }
+
+ public void resetState() throws IOException {
+ inf.reset();
+ }
+ }
+
+ public DefaultCompressionInputStream(InputStream in) {
+ super(new ResetableInflaterInputStream(in));
+ }
+
+ /**
+ * Allow subclasses to directly set the inflater stream
+ */
+ protected DefaultCompressionInputStream(InflaterInputStream in) {
+ super(in);
+ }
+
+ public int available() throws IOException {
+ return in.available();
+ }
+
+ public void close() throws IOException {
+ in.close();
+ }
+
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ public int read(byte[] data, int offset, int len) throws IOException {
+ return in.read(data, offset, len);
+ }
+
+ public long skip(long offset) throws IOException {
+ return in.skip(offset);
+ }
+
+ public void resetState() throws IOException {
+ ((ResetableInflaterInputStream) in).resetState();
+ }
+
+ }
+
+ /**
+ * Create a stream compressor that will write to the given output stream.
+ * @param out the location for the final output stream
+ * @return a stream the user can write uncompressed data to
+ */
+ public CompressionOutputStream createOutputStream(OutputStream out)
+ throws IOException {
+ return new DefaultCompressionOutputStream(out);
+ }
+
+ /**
+ * Create a stream decompressor that will read from the given input stream.
+ * @param in the stream to read compressed bytes from
+ * @return a stream to read uncompressed bytes from
+ */
+ public CompressionInputStream createInputStream(InputStream in)
+ throws IOException {
+ return new DefaultCompressionInputStream(in);
+ }
+
+ /**
+ * Get the default filename extension for this kind of compression.
+ * @return the extension including the '.'
+ */
+ public String getDefaultExtension() {
+ return ".deflate";
+ }
+
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=441210&r1=441209&r2=441210
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Sep 7 13:30:21 2006
@@ -19,6 +19,7 @@
import java.io.*;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.metrics.MetricsRecord;
@@ -123,15 +124,20 @@
final SequenceFile.Writer[] outs = new SequenceFile.Writer[partitions];
try {
FileSystem localFs = FileSystem.getNamed("local", job);
+ /** TODO: Figure out a way to deprecate 'mapred.compress.map.output' */
boolean compressTemps = job.getBoolean("mapred.compress.map.output",
false);
for (int i = 0; i < partitions; i++) {
outs[i] =
- new SequenceFile.Writer(localFs,
+ SequenceFile.createWriter(localFs, job,
this.mapOutputFile.getOutputFile(getTaskId(), i),
job.getMapOutputKeyClass(),
job.getMapOutputValueClass(),
- compressTemps);
+ compressTemps ? CompressionType.RECORD :
+ CompressionType.valueOf(
+ job.get("mapred.seqfile.compression.type",
+ "NONE"))
+ );
LOG.info("opened "+this.mapOutputFile.getOutputFile(getTaskId(), i).getName());
}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java?view=auto&rev=441210
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java Thu Sep 7 13:30:21 2006
@@ -0,0 +1,138 @@
+package org.apache.hadoop.io.compress;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+
+public class TestCodec extends TestCase {
+
+ private static final Log LOG=
+ LogFactory.getLog("org.apache.hadoop.io.compress.TestCodec");
+
+ public void testCodec() throws IOException {
+ int count = 10000;
+ int seed = new Random().nextInt();
+
+ codecTest(seed, count, "org.apache.hadoop.io.compress.DefaultCodec");
+ }
+
+ private static void codecTest(int seed, int count, String codecClass)
+ throws IOException {
+
+ // Create the codec
+ Configuration conf = new Configuration();
+ CompressionCodec codec = null;
+ try {
+ codec = (CompressionCodec)
+ ReflectionUtils.newInstance(conf.getClassByName(codecClass), conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Illegal codec!");
+ }
+ LOG.debug("Created a Codec object of type: " + codecClass);
+
+ // Generate data
+ DataOutputBuffer data = new DataOutputBuffer();
+ RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+ for(int i=0; i < count; ++i) {
+ generator.next();
+ RandomDatum key = generator.getKey();
+ RandomDatum value = generator.getValue();
+
+ key.write(data);
+ value.write(data);
+ }
+ DataInputBuffer originalData = new DataInputBuffer();
+ DataInputStream originalIn = new DataInputStream(new BufferedInputStream(originalData));
+ originalData.reset(data.getData(), 0, data.getLength());
+
+ LOG.debug("Generated " + count + " records");
+
+ // Compress data
+ DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
+ CompressionOutputStream deflateFilter =
+ codec.createOutputStream(compressedDataBuffer);
+ DataOutputStream deflateOut =
+ new DataOutputStream(new BufferedOutputStream(deflateFilter));
+
+ deflateFilter.resetState();
+ compressedDataBuffer.reset();
+ deflateOut.write(data.getData(), 0, data.getLength());
+ deflateOut.flush();
+ deflateFilter.finish();
+ LOG.debug("Finished compressing data");
+
+ // De-compress data
+ DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
+ CompressionInputStream inflateFilter =
+ codec.createInputStream(deCompressedDataBuffer);
+ DataInputStream inflateIn =
+ new DataInputStream(new BufferedInputStream(inflateFilter));
+
+ inflateFilter.resetState();
+ deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
+ compressedDataBuffer.getLength());
+
+ // Check
+ for(int i=0; i < count; ++i) {
+ RandomDatum k1 = new RandomDatum();
+ RandomDatum v1 = new RandomDatum();
+ k1.readFields(originalIn);
+ v1.readFields(originalIn);
+
+ RandomDatum k2 = new RandomDatum();
+ RandomDatum v2 = new RandomDatum();
+ k2.readFields(inflateIn);
+ v2.readFields(inflateIn);
+ }
+ LOG.debug("SUCCESS! Completed checking " + count + " records");
+ }
+
+ public static void main(String[] args) {
+ int count = 10000;
+ String codecClass = "org.apache.hadoop.io.compress.DefaultCodec";
+
+ String usage = "TestCodec [-count N] [-codec <codec class>]";
+ if (args.length == 0) {
+ System.err.println(usage);
+ System.exit(-1);
+ }
+
+ try {
+ for (int i=0; i < args.length; ++i) { // parse command line
+ if (args[i] == null) {
+ continue;
+ } else if (args[i].equals("-count")) {
+ count = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-codec")) {
+ codecClass = args[++i];
+ }
+ }
+
+ int seed = 0;
+ codecTest(seed, count, codecClass);
+ } catch (Exception e) {
+ System.err.println("Caught: " + e);
+ e.printStackTrace();
+ }
+
+ }
+
+ public TestCodec(String name) {
+ super(name);
+ }
+
+}