You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/03 23:08:16 UTC
svn commit: r452624 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/compress/
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/io/
Author: cutting
Date: Tue Oct 3 14:08:15 2006
New Revision: 452624
URL: http://svn.apache.org/viewvc?view=rev&rev=452624
Log:
HADOOP-522. Permit block compression with MapFile and SetFile.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SetFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 3 14:08:15 2006
@@ -112,6 +112,10 @@
stream. Implement an optimized version for DFS and local FS.
(Milind Bhandarkar via cutting)
+28. HADOOP-522. Permit block compression with MapFile and SetFile.
+ Since these formats are always sorted, block compression can
+ provide a big advantage. (cutting)
+
Release 0.6.2 - 2006-09-18
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java Tue Oct 3 14:08:15 2006
@@ -19,6 +19,7 @@
import java.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
/** A file-based map from keys to values.
*
@@ -69,12 +70,20 @@
this(fs, dirName, WritableComparator.get(keyClass), valClass, false);
}
- /** Create the named map for keys of the named class. */
+ /** Create the named map for keys of the named class.
+ * @deprecated specify a {@link CompressionType} instead
+ */
public Writer(FileSystem fs, String dirName,
Class keyClass, Class valClass, boolean compress)
throws IOException {
this(fs, dirName, WritableComparator.get(keyClass), valClass, compress);
}
+ /** Create the named map for keys of the named class. */
+ public Writer(Configuration conf, FileSystem fs, String dirName,
+ Class keyClass, Class valClass, CompressionType compress)
+ throws IOException {
+ this(conf,fs,dirName,WritableComparator.get(keyClass),valClass,compress);
+ }
/** Create the named map using the named key comparator. */
public Writer(FileSystem fs, String dirName,
@@ -82,12 +91,24 @@
throws IOException {
this(fs, dirName, comparator, valClass, false);
}
- /** Create the named map using the named key comparator. */
+ /** Create the named map using the named key comparator.
+ * @deprecated specify a {@link CompressionType} instead
+ */
public Writer(FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
boolean compress)
throws IOException {
+ this(new Configuration(), fs, dirName, comparator, valClass,
+ compress ? CompressionType.RECORD : CompressionType.NONE);
+ }
+
+ /** Create the named map using the named key comparator. */
+ public Writer(Configuration conf, FileSystem fs, String dirName,
+ WritableComparator comparator, Class valClass,
+ SequenceFile.CompressionType compress)
+ throws IOException {
+
this.comparator = comparator;
this.lastKey = comparator.newKey();
@@ -99,9 +120,11 @@
Class keyClass = comparator.getKeyClass();
this.data =
- new SequenceFile.Writer(fs, dataFile, keyClass, valClass, compress);
+ SequenceFile.createWriter
+ (fs,conf,dataFile,keyClass,valClass,compress);
this.index =
- new SequenceFile.Writer(fs, indexFile, keyClass, LongWritable.class);
+ SequenceFile.createWriter
+ (fs,conf,indexFile,keyClass,LongWritable.class,CompressionType.BLOCK);
}
/** The number of entries that are added before an index entry is added.*/
@@ -159,9 +182,7 @@
private WritableComparator comparator;
- private DataOutputBuffer keyBuf = new DataOutputBuffer();
- private DataOutputBuffer nextBuf = new DataOutputBuffer();
- private int nextKeyLen = -1;
+ private WritableComparable nextKey;
private long seekPosition = -1;
private int seekIndex = -1;
private long firstPosition;
@@ -300,14 +321,11 @@
public synchronized boolean seek(WritableComparable key)
throws IOException {
readIndex(); // make sure index is read
- keyBuf.reset(); // write key to keyBuf
- key.write(keyBuf);
if (seekIndex != -1 // seeked before
&& seekIndex+1 < count
&& comparator.compare(key,keys[seekIndex+1])<0 // before next indexed
- && comparator.compare(keyBuf.getData(), 0, keyBuf.getLength(),
- nextBuf.getData(), 0, nextKeyLen)
+ && comparator.compare(key, nextKey)
>= 0) { // but after last seeked
// do nothing
} else {
@@ -322,14 +340,14 @@
}
data.seek(seekPosition);
- while ((nextKeyLen = data.next(nextBuf.reset())) != -1) {
- int c = comparator.compare(keyBuf.getData(), 0, keyBuf.getLength(),
- nextBuf.getData(), 0, nextKeyLen);
+ if (nextKey == null)
+ nextKey = comparator.newKey();
+
+ while (data.next(nextKey)) {
+ int c = comparator.compare(key, nextKey);
if (c <= 0) { // at or beyond desired
- data.seek(seekPosition); // back off to previous
return c == 0;
}
- seekPosition = data.getPosition();
}
return false;
@@ -366,7 +384,7 @@
public synchronized Writable get(WritableComparable key, Writable val)
throws IOException {
if (seek(key)) {
- next(getKey, val); // don't smash key
+ data.getCurrentValue(val);
return val;
} else
return null;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Tue Oct 3 14:08:15 2006
@@ -592,7 +592,15 @@
val.writeUncompressedBytes(out); // value
}
- /** Returns the current length of the output file. */
+ /** Returns the current length of the output file.
+ *
+ * <p>This always returns a synchronized position. In other words, {@link
+ * immediately after calling {@link Reader#seek(long)} with a position
+ * returned by this method, Reader#next(Writable) may be called. However
+ * the key may be earlier in the file than key last written when this
+ * method was called (e.g., with block-compression, it may be the first key
+ * in the block that was being written when this method was called).
+ */
public synchronized long getLength() throws IOException {
return out.getPos();
}
@@ -1023,13 +1031,13 @@
valLenBuffer = new DataInputBuffer();
keyLenInFilter = this.codec.createInputStream(keyLenBuffer);
- keyLenIn = new DataInputStream(new BufferedInputStream(keyLenInFilter));
+ keyLenIn = new DataInputStream(keyLenInFilter);
keyInFilter = this.codec.createInputStream(keyBuffer);
- keyIn = new DataInputStream(new BufferedInputStream(keyInFilter));
+ keyIn = new DataInputStream(keyInFilter);
valLenInFilter = this.codec.createInputStream(valLenBuffer);
- valLenIn = new DataInputStream(new BufferedInputStream(valLenInFilter));
+ valLenIn = new DataInputStream(valLenInFilter);
}
@@ -1058,19 +1066,17 @@
/** Read a compressed buffer */
private synchronized void readBuffer(DataInputBuffer buffer,
- CompressionInputStream filter, boolean castAway) throws IOException {
+ CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
- if (false == castAway) {
- // Reset the codec
- filter.resetState();
-
- // Set up 'buffer' connected to the input-stream
- buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
- }
+ // Set up 'buffer' connected to the input-stream
+ buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
+
+ // Reset the codec
+ filter.resetState();
}
/** Read the next 'compressed' block */
@@ -1078,8 +1084,8 @@
// Check if we need to throw away a whole block of
// 'values' due to 'lazy decompression'
if (lazyDecompress && !valuesDecompressed) {
- readBuffer(null, null, true);
- readBuffer(null, null, true);
+ in.seek(WritableUtils.readVInt(in)+in.getPos());
+ in.seek(WritableUtils.readVInt(in)+in.getPos());
}
// Reset internal states
@@ -1099,14 +1105,14 @@
noBufferedRecords = WritableUtils.readVInt(in);
// Read key lengths and keys
- readBuffer(keyLenBuffer, keyLenInFilter, false);
- readBuffer(keyBuffer, keyInFilter, false);
+ readBuffer(keyLenBuffer, keyLenInFilter);
+ readBuffer(keyBuffer, keyInFilter);
noBufferedKeys = noBufferedRecords;
// Read value lengths and values
if (!lazyDecompress) {
- readBuffer(valLenBuffer, valLenInFilter, false);
- readBuffer(valBuffer, valInFilter, false);
+ readBuffer(valLenBuffer, valLenInFilter);
+ readBuffer(valBuffer, valInFilter);
noBufferedValues = noBufferedRecords;
valuesDecompressed = true;
}
@@ -1126,8 +1132,8 @@
// Check if this is the first value in the 'block' to be read
if (lazyDecompress && !valuesDecompressed) {
// Read the value lengths and values
- readBuffer(valLenBuffer, valLenInFilter, false);
- readBuffer(valBuffer, valInFilter, false);
+ readBuffer(valLenBuffer, valLenInFilter);
+ readBuffer(valBuffer, valInFilter);
noBufferedValues = noBufferedRecords;
valuesDecompressed = true;
}
@@ -1377,9 +1383,18 @@
}
}
- /** Set the current byte position in the input file. */
+ /** Set the current byte position in the input file.
+ *
+ * <p>The position passed must be a position returned by {@link
+ * Writer#getLength()} when writing this file. To seek to an arbitrary
+ * position, use {@link Reader#sync(long)}.
+ */
public synchronized void seek(long position) throws IOException {
in.seek(position);
+ if (blockCompressed) { // trigger block read
+ noBufferedKeys = 0;
+ valuesDecompressed = true;
+ }
}
/** Seek to the next sync mark past a given position.*/
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SetFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SetFile.java?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SetFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SetFile.java Tue Oct 3 14:08:15 2006
@@ -20,6 +20,7 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
/** A file-based set of keys. */
public class SetFile extends MapFile {
@@ -38,6 +39,20 @@
public Writer(FileSystem fs, String dirName, WritableComparator comparator)
throws IOException {
super(fs, dirName, comparator, NullWritable.class);
+ }
+
+ /** Create a set naming the element class and compression type. */
+ public Writer(Configuration conf, FileSystem fs, String dirName,
+ Class keyClass, SequenceFile.CompressionType compress)
+ throws IOException {
+ this(conf, fs, dirName, WritableComparator.get(keyClass), compress);
+ }
+
+ /** Create a set naming the element comparator and compression type. */
+ public Writer(Configuration conf, FileSystem fs, String dirName,
+ WritableComparator comparator,
+ SequenceFile.CompressionType compress) throws IOException {
+ super(conf, fs, dirName, comparator, NullWritable.class, compress);
}
/** Append a key to a set. The key must be strictly greater than the
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java Tue Oct 3 14:08:15 2006
@@ -20,8 +20,13 @@
import java.io.InputStream;
/**
-* A compression input stream.
- * @author Arun C Murthy
+ * A compression input stream.
+ *
+ * <p>Implementations are assumed to be buffered. This permits clients to
+ * reposition the underlying input stream then call {@link #resetState()},
+ * without having to also synchronize client buffers.
+ *
+ * @author Arun C Murthy
*/
public abstract class CompressionInputStream extends InputStream {
/**
@@ -49,8 +54,8 @@
public abstract int read(byte[] b, int off, int len) throws IOException;
/**
- * Reset the compression to the initial state. Does not reset the underlying
- * stream.
+ * Reset the decompressor to its initial state and discard any buffered data,
+ * as the underlying stream may have been repositioned.
*/
public abstract void resetState() throws IOException;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Tue Oct 3 14:08:15 2006
@@ -23,6 +23,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
@@ -40,10 +41,10 @@
// ignore the progress parameter, since MapFile is local
final MapFile.Writer out =
- new MapFile.Writer(fs, file.toString(),
+ new MapFile.Writer(job, fs, file.toString(),
job.getMapOutputKeyClass(),
job.getMapOutputValueClass(),
- job.getBoolean("mapred.output.compress", false));
+ SequenceFile.getCompressionType(job));
return new RecordWriter() {
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java?view=diff&rev=452624&r1=452623&r2=452624
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java Tue Oct 3 14:08:15 2006
@@ -24,6 +24,7 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
/** Support for flat files of binary key/value pairs. */
public class TestSetFile extends TestCase {
@@ -39,7 +40,10 @@
FileSystem fs = new LocalFileSystem(conf);
try {
RandomDatum[] data = generate(10000);
- writeTest(fs, data, FILE);
+ writeTest(fs, data, FILE, CompressionType.NONE);
+ readTest(fs, data, FILE);
+
+ writeTest(fs, data, FILE, CompressionType.BLOCK);
readTest(fs, data, FILE);
} finally {
fs.close();
@@ -47,23 +51,27 @@
}
private static RandomDatum[] generate(int count) {
- LOG.debug("generating " + count + " records in memory");
+ LOG.info("generating " + count + " records in memory");
RandomDatum[] data = new RandomDatum[count];
RandomDatum.Generator generator = new RandomDatum.Generator();
for (int i = 0; i < count; i++) {
generator.next();
data[i] = generator.getValue();
}
- LOG.info("sorting " + count + " records in debug");
+ LOG.info("sorting " + count + " records");
Arrays.sort(data);
return data;
}
- private static void writeTest(FileSystem fs, RandomDatum[] data, String file)
+ private static void writeTest(FileSystem fs, RandomDatum[] data,
+ String file, CompressionType compress)
throws IOException {
MapFile.delete(fs, file);
- LOG.debug("creating with " + data.length + " records");
- SetFile.Writer writer = new SetFile.Writer(fs, file, RandomDatum.class);
+ LOG.info("creating with " + data.length + " records");
+ SetFile.Writer writer =
+ new SetFile.Writer(conf, fs, file,
+ WritableComparator.get(RandomDatum.class),
+ compress);
for (int i = 0; i < data.length; i++)
writer.append(data[i]);
writer.close();
@@ -72,14 +80,16 @@
private static void readTest(FileSystem fs, RandomDatum[] data, String file)
throws IOException {
RandomDatum v = new RandomDatum();
- LOG.debug("reading " + data.length + " records");
+ int sample = (int)Math.sqrt(data.length);
+ Random random = new Random();
+ LOG.info("reading " + sample + " records");
SetFile.Reader reader = new SetFile.Reader(fs, file, conf);
- for (int i = 0; i < data.length; i++) {
- if (!reader.seek(data[i]))
+ for (int i = 0; i < sample; i++) {
+ if (!reader.seek(data[random.nextInt(data.length)]))
throw new RuntimeException("wrong value at " + i);
}
reader.close();
- LOG.info("done reading " + data.length + " debug");
+ LOG.info("done reading " + data.length);
}
@@ -89,7 +99,9 @@
boolean create = true;
boolean check = true;
String file = FILE;
- String usage = "Usage: TestSetFile (-local | -dfs <namenode:port>) [-count N] [-nocreate] [-nocheck] file";
+ String compress = "NONE";
+
+ String usage = "Usage: TestSetFile (-local | -dfs <namenode:port>) [-count N] [-nocreate] [-nocheck] [-compress type] file";
if (args.length == 0) {
System.err.println(usage);
@@ -108,26 +120,30 @@
create = false;
} else if (args[i].equals("-nocheck")) {
check = false;
+ } else if (args[i].equals("-compress")) {
+ compress = args[++i];
} else {
// file is required parameter
file = args[i];
}
+ }
- LOG.info("count = " + count);
- LOG.info("create = " + create);
- LOG.info("check = " + check);
- LOG.info("file = " + file);
-
- RandomDatum[] data = generate(count);
-
- if (create) {
- writeTest(fs, data, file);
- }
-
- if (check) {
- readTest(fs, data, file);
- }
+ LOG.info("count = " + count);
+ LOG.info("create = " + create);
+ LOG.info("check = " + check);
+ LOG.info("compress = " + compress);
+ LOG.info("file = " + file);
+
+ RandomDatum[] data = generate(count);
+
+ if (create) {
+ writeTest(fs, data, file, CompressionType.valueOf(compress));
}
+
+ if (check) {
+ readTest(fs, data, file);
+ }
+
} finally {
fs.close();
}