You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/08/10 18:36:48 UTC
svn commit: r231281 - in /lucene/nutch/branches/mapred/src:
java/org/apache/nutch/io/DataInputBuffer.java
java/org/apache/nutch/io/SequenceFile.java
test/org/apache/nutch/io/TestSequenceFile.java
Author: cutting
Date: Wed Aug 10 09:36:41 2005
New Revision: 231281
URL: http://svn.apache.org/viewcvs?rev=231281&view=rev
Log:
Add value compression option to SequenceFile.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/DataInputBuffer.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
lucene/nutch/branches/mapred/src/test/org/apache/nutch/io/TestSequenceFile.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/DataInputBuffer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/DataInputBuffer.java?rev=231281&r1=231280&r2=231281&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/DataInputBuffer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/DataInputBuffer.java Wed Aug 10 09:36:41 2005
@@ -53,6 +53,7 @@
}
public int getPosition() { return pos; }
+ public int getLength() { return count; }
}
private Buffer buffer;
@@ -79,5 +80,8 @@
/** Returns the current position in the input. */
public int getPosition() { return buffer.getPosition(); }
+
+ /** Returns the length of the input. */
+ public int getLength() { return buffer.getLength(); }
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=231281&r1=231280&r2=231281&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Wed Aug 10 09:36:41 2005
@@ -18,6 +18,7 @@
import java.io.*;
import java.util.*;
+import java.util.zip.*;
import java.util.logging.*;
import java.nio.channels.*;
import java.net.InetAddress;
@@ -35,7 +36,7 @@
private SequenceFile() {} // no public ctor
private static byte[] VERSION = new byte[] {
- (byte)'S', (byte)'E', (byte)'Q', 2
+ (byte)'S', (byte)'E', (byte)'Q', 3
};
private static final int SYNC_ESCAPE = -1; // "length" of sync entries
@@ -55,6 +56,11 @@
private Class keyClass;
private Class valClass;
+ private boolean deflateValues;
+ private DataOutputBuffer deflateIn = new DataOutputBuffer();
+ private byte[] deflateOut = new byte[1024];
+ private Deflater deflater = new Deflater();
+
// Insert a globally unique 16-byte value every few entries, so that one
// can seek into the middle of a file and then synchronize with record
// starts and ends by scanning for this value.
@@ -74,30 +80,45 @@
public Writer(NutchFileSystem nfs, String name,
Class keyClass, Class valClass)
throws IOException {
+ this(nfs, name, keyClass, valClass, false);
+ }
+
+ /** Create the named file.
+ * @param compress if true, values are compressed.
+ */
+ public Writer(NutchFileSystem nfs, String name,
+ Class keyClass, Class valClass, boolean compress)
+ throws IOException {
this.nfs = nfs;
this.target = new File(name);
init(new NFSDataOutputStream(nfs.create(target)),
- keyClass, valClass);
+ keyClass, valClass, compress);
}
/** Write to an arbitrary stream using a specified buffer size. */
private Writer(NFSDataOutputStream out,
- Class keyClass, Class valClass) throws IOException {
- init(out, keyClass, valClass);
+ Class keyClass, Class valClass, boolean compress)
+ throws IOException {
+ init(out, keyClass, valClass, compress);
}
/** Write and flush the file header. */
private void init(NFSDataOutputStream out,
- Class keyClass, Class valClass) throws IOException {
+ Class keyClass, Class valClass,
+ boolean compress) throws IOException {
this.out = out;
this.out.write(VERSION);
this.keyClass = keyClass;
this.valClass = valClass;
+ this.deflateValues = compress;
+
new UTF8(WritableName.getName(keyClass)).write(this.out);
new UTF8(WritableName.getName(valClass)).write(this.out);
+ this.out.writeBoolean(deflateValues);
+
out.write(sync); // write the sync bytes
this.out.flush(); // flush header
@@ -134,7 +155,20 @@
if (keyLength == 0)
throw new IOException("zero length keys not allowed: " + key);
- val.write(buffer);
+ if (deflateValues) {
+ deflateIn.reset();
+ val.write(deflateIn);
+ deflater.reset();
+ deflater.setInput(deflateIn.getData(), 0, deflateIn.getLength());
+ deflater.finish();
+ while (!deflater.finished()) {
+ int count = deflater.deflate(deflateOut);
+ buffer.write(deflateOut, 0, count);
+ }
+ } else {
+ val.write(buffer);
+ }
+
append(buffer.getData(), 0, buffer.getLength(), keyLength);
}
@@ -185,6 +219,11 @@
private long end;
private int keyLength;
+ private boolean inflateValues;
+ private byte[] inflateIn = new byte[1024];
+ private DataOutputBuffer inflateOut = new DataOutputBuffer();
+ private Inflater inflater = new Inflater();
+
/** Open the named file. */
public Reader(NutchFileSystem nfs, String file) throws IOException {
this(nfs, file, NutchConf.get().getInt("io.file.buffer.size", 4096));
@@ -229,6 +268,10 @@
className.readFields(in); // read val class name
this.valClass = WritableName.getClass(className.toString());
+ if (version[3] > 2) { // if version > 2
+ this.inflateValues = in.readBoolean(); // is compressed?
+ }
+
if (version[3] > 1) { // if version > 1
in.readFully(sync); // read sync bytes
}
@@ -245,6 +288,9 @@
/** Returns the class of values in this file. */
public Class getValueClass() { return valClass; }
+ /** Returns true if values are compressed. */
+ public boolean isCompressed() { return inflateValues; }
+
/** Read the next key in the file into <code>key</code>, skipping its
* value. True if another entry exists, and false at end of file. */
public synchronized boolean next(Writable key) throws IOException {
@@ -278,11 +324,29 @@
boolean more = next(key);
if (more) {
+
+ if (inflateValues) {
+ inflater.reset();
+ inflater.setInput(outBuf.getData(), keyLength,
+ outBuf.getLength()-keyLength);
+ inflateOut.reset();
+ while (!inflater.finished()) {
+ try {
+ int count = inflater.inflate(inflateIn);
+ inflateOut.write(inflateIn, 0, count);
+ } catch (DataFormatException e) {
+ throw new IOException (e.toString());
+ }
+ }
+ inBuf.reset(inflateOut.getData(), inflateOut.getLength());
+ }
+
val.readFields(inBuf);
- if (inBuf.getPosition() != outBuf.getLength())
+
+ if (inBuf.getPosition() != inBuf.getLength())
throw new IOException(val+" read "+(inBuf.getPosition()-keyLength)
+ " bytes, should read " +
- (outBuf.getLength()-keyLength));
+ (inBuf.getLength()-keyLength));
}
return more;
@@ -533,7 +597,7 @@
out.writeLong(count); // write count
}
- Writer writer = new Writer(out, keyClass, valClass);
+ Writer writer = new Writer(out, keyClass, valClass, in.isCompressed());
if (!done) {
writer.sync = null; // disable sync on temp files
}
@@ -651,7 +715,7 @@
MergeStream ms = new MergeStream(reader); // add segment to queue
if (ms.next()) {
- queue.put(ms);
+ queue.add(ms);
}
in.seek(reader.end);
}
@@ -740,6 +804,16 @@
private class MergeQueue extends PriorityQueue {
private NFSDataOutputStream out;
private boolean done;
+ private boolean compress;
+
+ public void add(MergeStream stream) throws IOException {
+ if (size() == 0) {
+ compress = stream.in.isCompressed();
+ } else if (compress != stream.in.isCompressed()) {
+ throw new IOException("All merged files must be compressed or not.");
+ }
+ put(stream);
+ }
public MergeQueue(int size, String outName, boolean done)
throws IOException {
@@ -758,7 +832,7 @@
}
public void merge() throws IOException {
- Writer writer = new Writer(out, keyClass, valClass);
+ Writer writer = new Writer(out, keyClass, valClass, compress);
if (!done) {
writer.sync = null; // disable sync on temp files
}
Modified: lucene/nutch/branches/mapred/src/test/org/apache/nutch/io/TestSequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/test/org/apache/nutch/io/TestSequenceFile.java?rev=231281&r1=231280&r2=231281&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/test/org/apache/nutch/io/TestSequenceFile.java (original)
+++ lucene/nutch/branches/mapred/src/test/org/apache/nutch/io/TestSequenceFile.java Wed Aug 10 09:36:41 2005
@@ -42,7 +42,7 @@
NutchFileSystem nfs = new LocalFileSystem();
try {
//LOG.setLevel(Level.FINE);
- writeTest(nfs, count, seed, file);
+ writeTest(nfs, count, seed, file, false);
readTest(nfs, count, seed, file);
sortTest(nfs, count, megabytes, factor, false, file);
@@ -61,12 +61,14 @@
}
}
- private static void writeTest(NutchFileSystem nfs, int count, int seed, String file)
+ private static void writeTest(NutchFileSystem nfs, int count, int seed,
+ String file, boolean compress)
throws IOException {
new File(file).delete();
LOG.fine("creating with " + count + " records");
SequenceFile.Writer writer =
- new SequenceFile.Writer(nfs, file, RandomDatum.class, RandomDatum.class);
+ new SequenceFile.Writer(nfs, file, RandomDatum.class, RandomDatum.class,
+ compress);
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
for (int i = 0; i < count; i++) {
generator.next();
@@ -210,8 +212,9 @@
boolean check = false;
boolean fast = false;
boolean merge = false;
+ boolean compress = false;
String file = null;
- String usage = "Usage: SequenceFile (-local | -ndfs <namenode:port>) [-count N] [-megabytes M] [-factor F] [-nocreate] [-check] [-fast] [-merge] file";
+ String usage = "Usage: SequenceFile (-local | -ndfs <namenode:port>) [-count N] [-megabytes M] [-factor F] [-nocreate] [-check] [-fast] [-merge] [-compress] file";
if (args.length == 0) {
System.err.println(usage);
@@ -237,6 +240,8 @@
fast = true;
} else if (args[i].equals("-merge")) {
merge = true;
+ } else if (args[i].equals("-compress")) {
+ compress = true;
} else {
// file is required parameter
file = args[i];
@@ -249,6 +254,7 @@
LOG.info("check = " + check);
LOG.info("fast = " + fast);
LOG.info("merge = " + merge);
+ LOG.info("compress = " + compress);
LOG.info("file = " + file);
int seed = 0;
@@ -256,7 +262,7 @@
LOG.setLevel(Level.FINE);
if (create && !merge) {
- writeTest(nfs, count, seed, file);
+ writeTest(nfs, count, seed, file, compress);
readTest(nfs, count, seed, file);
}