You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ph...@apache.org on 2010/01/22 02:28:02 UTC
svn commit: r901959 - in /hadoop/avro/trunk: ./ doc/src/content/xdocs/
lang/java/ lang/java/src/java/org/apache/avro/file/
lang/java/src/java/org/apache/avro/tool/
lang/java/src/test/java/org/apache/avro/
lang/java/src/test/java/org/apache/avro/file/ l...
Author: philz
Date: Fri Jan 22 01:28:00 2010
New Revision: 901959
URL: http://svn.apache.org/viewvc?rev=901959&view=rev
Log:
AVRO-135. Add compression to data files. (philz)
Added:
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/LengthLimitedInputStream.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileDeflate.java
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/file/
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/file/TestLengthLimitedInputStream.java
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/doc/src/content/xdocs/spec.xml
hadoop/avro/trunk/lang/java/build.xml
hadoop/avro/trunk/lang/java/ivy.xml
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileWriteTool.java
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Fri Jan 22 01:28:00 2010
@@ -69,6 +69,8 @@
AVRO-346. Add function to validate a datum against a schema. (massie)
AVRO-306. Add Ruby implementation. (Jeff Hodges via cutting)
+
+ AVRO-135. Add compression to data files. (philz)
IMPROVEMENTS
Modified: hadoop/avro/trunk/doc/src/content/xdocs/spec.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/doc/src/content/xdocs/spec.xml?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/doc/src/content/xdocs/spec.xml (original)
+++ hadoop/avro/trunk/doc/src/content/xdocs/spec.xml Fri Jan 22 01:28:00 2010
@@ -624,10 +624,10 @@
<li><strong>schema</strong> contains the schema of objects
stored in the file, as JSON data (required).</li>
<li><strong>codec</strong> the name of the compression codec
- used to compress blocks, as a string. The only value for codec
- currently supported is "null" (meaning no compression is
- performed). If codec is absent, it is assumed to be
- "null".</li>
+ used to compress blocks, as a string. Implementations
+ are required to support the following codecs: "null" and "deflate".
+ If codec is absent, it is assumed to be "null". The codecs
+ are described with more detail below.</li>
</ul>
<p>A file header is thus described by the following schema:</p>
@@ -649,6 +649,26 @@
<li>The file's 16-byte sync marker.</li>
</ul>
+ <section>
+ <title>Required Codecs</title>
+ <section>
+ <title>null</title>
+ <p>The "null" codec simply passes through data uncompressed.</p>
+ </section>
+
+ <section>
+ <title>deflate</title>
+ <p>The "deflate" codec writes the length of the compressed
+ data (as an Avro-encoded long), followed by data compressed using the
+ deflate algorithm as specified in
+ <a href="http://www.isi.edu/in-notes/rfc1951.txt">RFC 1951</a>,
+ and typically implemented using the zlib library. Note that this
+ format (unlike the "zlib format" in RFC 1950) does not have a
+ checksum.
+ </p>
+ </section>
+ </section>
+
</section>
<section>
Modified: hadoop/avro/trunk/lang/java/build.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/build.xml?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/build.xml (original)
+++ hadoop/avro/trunk/lang/java/build.xml Fri Jan 22 01:28:00 2010
@@ -250,7 +250,7 @@
</checkstyle>
</target>
- <target name="compile-test-java" depends="ivy-retrieve-test,compile">
+ <target name="compile-test-java" depends="ivy-retrieve-test,ivy-retrieve-tools,compile">
<java-avro-compiler src="${test.schemata.dir}"
generated="${test.java.generated.dir}"
dest="${test.java.generated.classes}"
Modified: hadoop/avro/trunk/lang/java/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/ivy.xml?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/ivy.xml (original)
+++ hadoop/avro/trunk/lang/java/ivy.xml Fri Jan 22 01:28:00 2010
@@ -59,6 +59,8 @@
<dependency org="commons-lang" name="commons-lang" rev="2.4" />
<dependency org="org.apache.maven" name="maven-ant-tasks" rev="2.0.9"
conf="build->default"/>
+ <dependency org="net.sf.jopt-simple" name="jopt-simple" rev="3.2"
+ conf="test->default;tools->default"/>
</dependencies>
</ivy-module>
Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java (added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.avro.io.Decoder;
+
+/**
+ * Interface for Avro-supported compression codecs for data files.
+ *
+ * This is currently exclusively an internal-facing API.
+ */
+abstract class Codec {
+ /** Name of the codec; written to the file's metadata. */
+ abstract String getName();
+ /** Compresses the input data into out. */
+ abstract void compress(ByteArrayOutputStream data, OutputStream out) throws IOException;
+ /** Returns a decoder on the uncompressed stream. */
+ abstract Decoder decompress(InputStream in, Decoder vin) throws IOException;
+}
Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java (added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.zip.Deflater;
+
+import org.apache.avro.AvroRuntimeException;
+
+/** Encapsulates the ability to specify and configure a compression codec. */
+public abstract class CodecFactory {
+ /** Null codec, for no compression. */
+ public static CodecFactory nullCodec() {
+ return NullCodec.OPTION;
+ };
+
+ /** Deflate codec, with specific compression.
+ * compressionLevel should be between 1 and 9, inclusive. */
+ public static CodecFactory deflateCodec(int compressionLevel) {
+ return new DeflateCodec.Option(compressionLevel);
+ };
+
+ /** Creates internal Codec. */
+ protected abstract Codec createInstance();
+
+ /** Mapping of string names (stored as metas) and codecs.
+ * Note that currently options (like compression level)
+ * are not recoverable. */
+ private static final Map<String, CodecFactory> REGISTERED =
+ new HashMap<String, CodecFactory>();
+
+ private static final int DEFAULT_DEFLATE_LEVEL = Deflater.DEFAULT_COMPRESSION;
+
+ static {
+ addCodec("null", nullCodec());
+ addCodec("deflate", deflateCodec(DEFAULT_DEFLATE_LEVEL));
+ }
+
+ /** Maps a codec name into a CodecOption. */
+ public static CodecFactory fromString(String s) {
+ CodecFactory o = REGISTERED.get(s);
+ if (o == null) {
+ throw new AvroRuntimeException("Unrecognized codec: " + s);
+ }
+ return o;
+ }
+
+ /** Adds a new codec implementation. If name already had
+ * a codec associated with it, returns the previous codec. */
+ public static CodecFactory addCodec(String name, CodecFactory c) {
+ return REGISTERED.put(name, c);
+ }
+
+}
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java Fri Jan 22 01:28:00 2010
@@ -22,17 +22,17 @@
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
+import java.util.NoSuchElementException;
-import org.apache.avro.Schema;
import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
-import org.apache.avro.io.BinaryDecoder;
/** Streaming access to files written by {@link DataFileWriter}. Use {@link
* DataFileReader} for file-based input.
@@ -43,14 +43,20 @@
private Schema schema;
private DatumReader<D> reader;
+ /** Raw input stream. */
final InputStream in;
+ /** Decoder on raw input stream. (Used for metadata.) */
final Decoder vin;
+ /** Secondary decoder, for datums.
+ * (Different than vin for compressed segments.) */
+ Decoder datumIn = null;
Map<String,byte[]> meta = new HashMap<String,byte[]>();
long blockRemaining; // # entries remaining in block
byte[] sync = new byte[DataFileConstants.SYNC_SIZE];
byte[] syncBuffer = new byte[DataFileConstants.SYNC_SIZE];
+ private Codec codec;
/** Construct a reader for an input stream. For file-based input, use {@link
* DataFileReader}. This performs no buffering, for good performance, be
@@ -83,19 +89,25 @@
}
vin.readFixed(sync); // read sync
- String codec = getMetaString(DataFileConstants.CODEC);
- if (codec != null && ! codec.equals(DataFileConstants.NULL_CODEC)) {
- throw new IOException("Unknown codec: " + codec);
- }
+ this.codec = resolveCodec();
this.schema = Schema.parse(getMetaString(DataFileConstants.SCHEMA));
this.reader = reader;
reader.setSchema(schema);
}
-
+
+ private Codec resolveCodec() {
+ String codecStr = getMetaString(DataFileConstants.CODEC);
+ if (codecStr != null) {
+ return CodecFactory.fromString(codecStr).createInstance();
+ } else {
+ return CodecFactory.nullCodec().createInstance();
+ }
+ }
+
/** Return the schema used in this file. */
public Schema getSchema() { return schema; }
-
+
/** Return the value of a metadata property. */
public byte[] getMeta(String key) {
return meta.get(key);
@@ -125,8 +137,10 @@
/** True if more entries remain in this file. */
public boolean hasNext() {
try {
- if (blockRemaining == 0)
+ if (blockRemaining == 0) {
blockRemaining = vin.readLong(); // read block count
+ datumIn = codec.decompress(in, vin);
+ }
return blockRemaining != 0;
} catch (EOFException e) { // at EOF
return false;
@@ -153,7 +167,7 @@
public D next(D reuse) throws IOException {
if (!hasNext())
throw new NoSuchElementException();
- D result = reader.read(reuse, vin);
+ D result = reader.read(reuse, datumIn);
if (--blockRemaining == 0)
skipSync();
return result;
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java Fri Jan 22 01:28:00 2010
@@ -20,29 +20,30 @@
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-import java.io.File;
-import java.io.FileDescriptor;
-import java.io.FileOutputStream;
-import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
+import java.io.UnsupportedEncodingException;
import java.nio.channels.FileChannel;
import java.rmi.server.UID;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
-import org.apache.avro.Schema;
import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.generic.GenericDatumReader;
/** Stores in a file a sequence of data conforming to a schema. The schema is
* stored in the file with the data. Each datum in a file is of the same
@@ -70,6 +71,13 @@
private int syncInterval = DataFileConstants.DEFAULT_SYNC_INTERVAL;
private boolean isOpen;
+ private Codec codec;
+
+ private static final HashSet<String> RESERVED_META = new HashSet<String>();
+ static {
+ RESERVED_META.add("codec");
+ RESERVED_META.add("schema");
+ }
/** Construct a writer, not yet open. */
public DataFileWriter(DatumWriter<D> dout) {
@@ -82,6 +90,17 @@
private void assertNotOpen() {
if (isOpen) throw new AvroRuntimeException("already open");
}
+
+ /**
+ * Configures this writer to use the given codec.
+ * May not be reset after writes have begun.
+ */
+ public DataFileWriter<D> setCodec(CodecFactory c) {
+ assertNotOpen();
+ this.codec = c.createInstance();
+ setMetaInternal("codec", codec.getName());
+ return this;
+ }
/** Set the synchronization interval for this file, in bytes. */
public DataFileWriter<D> setSyncInterval(int syncInterval) {
@@ -100,7 +119,7 @@
assertNotOpen();
this.schema = schema;
- setMeta(DataFileConstants.SCHEMA, schema.toString());
+ setMetaInternal(DataFileConstants.SCHEMA, schema.toString());
this.sync = generateSync();
init(outs);
@@ -134,6 +153,13 @@
this.schema = reader.getSchema();
this.sync = reader.sync;
this.meta.putAll(reader.meta);
+ byte[] codecBytes = this.meta.get("codec");
+ if (codecBytes != null) {
+ String strCodec = new String(codecBytes, "UTF-8");
+ this.codec = CodecFactory.fromString(strCodec).createInstance();
+ } else {
+ this.codec = CodecFactory.nullCodec().createInstance();
+ }
FileChannel channel = raf.getChannel(); // seek to end
channel.position(channel.size());
@@ -144,11 +170,14 @@
}
private void init(OutputStream outs) throws IOException {
- this.buffer = new ByteArrayOutputStream(syncInterval*2);
- this.bufOut = new BinaryEncoder(buffer);
this.out = new BufferedFileOutputStream(outs);
this.vout = new BinaryEncoder(out);
dout.setSchema(schema);
+ this.buffer = new ByteArrayOutputStream(syncInterval*2);
+ if (this.codec == null) {
+ this.codec = CodecFactory.nullCodec().createInstance();
+ }
+ this.bufOut = new BinaryEncoder(buffer);
this.isOpen = true;
}
@@ -163,12 +192,28 @@
}
}
- /** Set a metadata property. */
- public DataFileWriter<D> setMeta(String key, byte[] value) {
+ private DataFileWriter<D> setMetaInternal(String key, byte[] value) {
assertNotOpen();
meta.put(key, value);
return this;
}
+
+ public DataFileWriter<D> setMetaInternal(String key, String value) {
+ try {
+ return setMetaInternal(key, value.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Set a metadata property. */
+ public DataFileWriter<D> setMeta(String key, byte[] value) {
+ if (RESERVED_META.contains(key)) {
+ throw new AvroRuntimeException("Cannot set reserved meta key: " + key);
+ }
+ return setMetaInternal(key, value);
+ }
+
/** Set a metadata property. */
public DataFileWriter<D> setMeta(String key, String value) {
try {
@@ -194,7 +239,7 @@
private void writeBlock() throws IOException {
if (blockCount > 0) {
vout.writeLong(blockCount);
- buffer.writeTo(out);
+ codec.compress(buffer, out);
buffer.reset();
blockCount = 0;
out.write(sync);
Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java (added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Implements DEFLATE (RFC1951) compression and decompression.
+ *
+ * Note that there is a distinction between RFC1951 (deflate)
+ * and RFC1950 (zlib). zlib adds an extra 2-byte header
+ * at the front, and a 4-byte checksum at the end. The
+ * code here, by passing "true" as the "nowrap" option to
+ * {@link Inflater} and {@link Deflater}, is using
+ * RFC1951.
+ */
+class DeflateCodec extends Codec {
+
+ static class Option extends CodecFactory {
+ private int compressionLevel;
+
+ public Option(int compressionLevel) {
+ this.compressionLevel = compressionLevel;
+ }
+
+ @Override
+ protected Codec createInstance() {
+ return new DeflateCodec(compressionLevel);
+ }
+ }
+
+ ByteArrayOutputStream compressionBuffer;
+ private Deflater deflater;
+ private int compressionLevel;
+ private Inflater inflater;
+
+ public DeflateCodec(int compressionLevel) {
+ this.compressionLevel = compressionLevel;
+ }
+
+ @Override
+ String getName() {
+ return "deflate";
+ }
+
+ @Override
+ void compress(ByteArrayOutputStream buffer, OutputStream out) throws IOException {
+ if (compressionBuffer == null) {
+ compressionBuffer = new ByteArrayOutputStream(buffer.size());
+ }
+ if (deflater == null) {
+ deflater = new Deflater(compressionLevel, false);
+ }
+ // Pass output through deflate, and prepend with length of compressed output.
+ DeflaterOutputStream deflaterStream =
+ new DeflaterOutputStream(compressionBuffer, deflater);
+ buffer.writeTo(deflaterStream);
+ deflaterStream.finish();
+ new BinaryEncoder(out).writeLong(compressionBuffer.size());
+ compressionBuffer.writeTo(out);
+ compressionBuffer.reset();
+ deflater.reset();
+ }
+
+ @Override
+ Decoder decompress(InputStream in, Decoder vin) throws IOException {
+ if (inflater == null) {
+ inflater = new Inflater(false);
+ }
+ long compressedLength = vin.readLong();
+ InputStream uncompressed = new InflaterInputStream(
+ new LengthLimitedInputStream(in, compressedLength),
+ inflater);
+ inflater.reset();
+ return new BinaryDecoder(uncompressed);
+ }
+
+}
Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/LengthLimitedInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/LengthLimitedInputStream.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/LengthLimitedInputStream.java (added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/LengthLimitedInputStream.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/** Represents a substream of certain length. */
+class LengthLimitedInputStream extends FilterInputStream {
+
+ /** Bytes remaining. */
+ private long remaining;
+
+ protected LengthLimitedInputStream(InputStream in, long maxLength) {
+ super(in);
+ remaining = maxLength;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (remaining > 0) {
+ int v = super.read();
+ if (v != -1) {
+ remaining--;
+ }
+ return v;
+ }
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ /**
+ * Returns at most Integer.MAX_VALUE.
+ */
+ private int remainingInt() {
+ return (int)Math.min(remaining, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (remaining == 0) {
+ return -1;
+ }
+ if (len > remaining) {
+ len = remainingInt();
+ }
+ int v = super.read(b, off, len);
+ if (v != -1) {
+ remaining -= v;
+ }
+ return v;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return Math.min(super.available(), remainingInt());
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ long v = super.skip(Math.min(remaining, n));
+ remaining -= v;
+ return v;
+ }
+}
Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java (added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.avro.io.Decoder;
+
+/** Implements "null" (pass through) codec. */
+final class NullCodec extends Codec {
+
+ private static final NullCodec INSTANCE = new NullCodec();
+
+ static class Option extends CodecFactory {
+
+ @Override
+ protected Codec createInstance() {
+ return INSTANCE;
+ }
+
+ }
+ /** No options available for NullCodec. */
+ public static final CodecFactory OPTION = new Option();
+
+ @Override
+ String getName() {
+ return "null";
+ }
+
+ @Override
+ void compress(ByteArrayOutputStream buffer, OutputStream out) throws IOException {
+ buffer.writeTo(out);
+ }
+
+ @Override
+ Decoder decompress(InputStream in, Decoder vin) {
+ return vin;
+ }
+}
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileWriteTool.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileWriteTool.java?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileWriteTool.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileWriteTool.java Fri Jan 22 01:28:00 2010
@@ -23,7 +23,12 @@
import java.io.PrintStream;
import java.util.List;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+
import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.generic.GenericDatumReader;
@@ -47,20 +52,31 @@
@Override
public int run(InputStream stdin, PrintStream out, PrintStream err,
List<String> args) throws Exception {
- if (args.size() != 2) {
+
+ OptionParser p = new OptionParser();
+ OptionSpec<String> codec =
+ p.accepts("codec", "Compression codec")
+ .withRequiredArg()
+ .defaultsTo("null")
+ .ofType(String.class);
+ OptionSet opts = p.parse(args.toArray(new String[0]));
+
+ if (opts.nonOptionArguments().size() != 2) {
err.println("Expected 2 args: schema input_file");
+ p.printHelpOn(err);
return 1;
}
-
+
Schema schema = Schema.parse(args.get(0));
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
-
+
InputStream input = Util.fileOrStdin(args.get(1), stdin);
try {
DataInputStream din = new DataInputStream(input);
DataFileWriter<Object> writer =
- new DataFileWriter<Object>(new GenericDatumWriter<Object>())
- .create(schema, out);
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>());
+ writer.setCodec(CodecFactory.fromString(codec.value(opts)));
+ writer.create(schema, out);
Decoder decoder = new JsonDecoder(schema, din);
Object datum;
while (true) {
Modified: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java Fri Jan 22 01:28:00 2010
@@ -17,6 +17,16 @@
*/
package org.apache.avro;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
@@ -24,23 +34,41 @@
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.util.Random;
-import java.io.File;
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+@RunWith(Parameterized.class)
public class TestDataFile {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestDataFile.class);
+
+ CodecFactory codec = null;
+ public TestDataFile(CodecFactory codec) {
+ this.codec = codec;
+ LOG.info("Running with codec: " + codec);
+ }
+
+ @Parameters
+ public static List<Object[]> codecs() {
+ List<Object[]> r = new ArrayList<Object[]>();
+ r.add(new Object[] { null });
+ r.add(new Object[] { CodecFactory.deflateCodec(1) });
+ r.add(new Object[] { CodecFactory.deflateCodec(9) });
+ r.add(new Object[] { CodecFactory.nullCodec() });
+ return r;
+ }
+
private static final int COUNT =
Integer.parseInt(System.getProperty("test.count", "10"));
- private static final boolean VALIDATE =
+ private static final boolean VALIDATE =
!"false".equals(System.getProperty("test.validate", "true"));
private static final File DIR
= new File(System.getProperty("test.dir", "/tmp"));
private static final File DATAFILE_DIR
= new File(System.getProperty("test.dir", "/tmp"));
- private static final File FILE = new File(DIR, "test.avro");
private static final long SEED = System.currentTimeMillis();
private static final String SCHEMA_JSON =
@@ -49,12 +77,19 @@
+"{\"name\":\"longField\", \"type\":\"long\"}]}";
private static final Schema SCHEMA = Schema.parse(SCHEMA_JSON);
+ private File makeFile() {
+ return new File(DIR, "test-" + codec + ".avro");
+ }
+
@Test
public void testGenericWrite() throws IOException {
DataFileWriter<Object> writer =
new DataFileWriter<Object>(new GenericDatumWriter<Object>())
- .setSyncInterval(100)
- .create(SCHEMA, FILE);
+ .setSyncInterval(100);
+ if (codec != null) {
+ writer.setCodec(codec);
+ }
+ writer.create(SCHEMA, makeFile());
try {
int count = 0;
for (Object datum : new RandomData(SCHEMA, COUNT, SEED)) {
@@ -70,7 +105,7 @@
@Test
public void testGenericRead() throws IOException {
DataFileReader<Object> reader =
- new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
+ new DataFileReader<Object>(makeFile(), new GenericDatumReader<Object>());
try {
Object datum = null;
if (VALIDATE) {
@@ -90,12 +125,13 @@
@Test
public void testSplits() throws IOException {
+ File file = makeFile();
DataFileReader<Object> reader =
- new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
+ new DataFileReader<Object>(file, new GenericDatumReader<Object>());
Random rand = new Random(SEED);
try {
int splits = 10; // number of splits
- int length = (int)FILE.length(); // length of file
+ int length = (int)file.length(); // length of file
int end = length; // end of split
int remaining = end; // bytes remaining
int count = 0; // count of entries
@@ -117,10 +153,11 @@
@Test
public void testGenericAppend() throws IOException {
- long start = FILE.length();
+ File file = makeFile();
+ long start = file.length();
DataFileWriter<Object> writer =
new DataFileWriter<Object>(new GenericDatumWriter<Object>())
- .appendTo(FILE);
+ .appendTo(file);
try {
for (Object datum : new RandomData(SCHEMA, COUNT, SEED+1)) {
writer.append(datum);
@@ -129,7 +166,7 @@
writer.close();
}
DataFileReader<Object> reader =
- new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
+ new DataFileReader<Object>(file, new GenericDatumReader<Object>());
try {
reader.seek(start);
Object datum = null;
@@ -162,7 +199,7 @@
Schema projection = null;
if (args.length > 1)
projection = Schema.parse(new File(args[1]));
- TestDataFile tester = new TestDataFile();
+ TestDataFile tester = new TestDataFile(null);
tester.readFile(input, new GenericDatumReader<Object>(null, projection));
long start = System.currentTimeMillis();
for (int i = 0; i < 4; i++)
@@ -195,7 +232,7 @@
// }
private void readFiles(DatumReader<Object> datumReader) throws IOException {
- TestDataFile test = new TestDataFile();
+ TestDataFile test = new TestDataFile(null);
for (File f : DATAFILE_DIR.listFiles())
test.readFile(f, datumReader);
}
Added: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileDeflate.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileDeflate.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileDeflate.java (added)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileDeflate.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+/** Simple test of DataFileWriter and DataFileStream with deflate codec. */
+public class TestDataFileDeflate {
+ @Test
+ public void testWriteAndRead() throws IOException {
+ Schema schema = Schema.create(Type.STRING);
+
+ // Write it
+ DataFileWriter<Utf8> w = new DataFileWriter<Utf8>(new GenericDatumWriter<Utf8>(schema));
+ w.setCodec(CodecFactory.deflateCodec(6));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ w.create(schema, baos);
+ w.append(new Utf8("hello world"));
+ w.append(new Utf8("hello moon"));
+ w.sync();
+ w.append(new Utf8("bye bye world"));
+ w.append(new Utf8("bye bye moon"));
+ w.close();
+
+ // Read it
+ DataFileStream<Utf8> r = new DataFileStream<Utf8>(
+ new ByteArrayInputStream(baos.toByteArray()),
+ new GenericDatumReader<Utf8>(schema));
+ assertEquals("hello world", r.next().toString());
+ assertEquals("hello moon", r.next().toString());
+ assertEquals("bye bye world", r.next().toString());
+ assertEquals("bye bye moon", r.next().toString());
+ assertFalse(r.hasNext());
+ }
+}
Added: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/file/TestLengthLimitedInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/file/TestLengthLimitedInputStream.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/file/TestLengthLimitedInputStream.java (added)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/file/TestLengthLimitedInputStream.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLengthLimitedInputStream {
+ InputStream raw;
+
+ @Before
+ public void setupRawStream() {
+ byte[] buf = new byte[128];
+ for (int i = 0; i < 128; ++i) {
+ buf[i] = (byte)i;
+ }
+ raw = new ByteArrayInputStream(buf);
+ }
+
+ @Test
+ public void testAvailable() throws IOException {
+ InputStream is = new LengthLimitedInputStream(raw, 10);
+ assertEquals(10, is.available());
+ is.skip(100);
+ assertEquals(0, is.available());
+ }
+
+ @Test
+ public void testRead() throws IOException {
+ InputStream is = new LengthLimitedInputStream(raw, 10);
+ byte[] x = new byte[12];
+ assertEquals(0, is.read());
+ assertEquals(9, is.read(x));
+ assertEquals(-1, is.read(x));
+ assertEquals(x[8], 9);
+ }
+}
Modified: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java (original)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java Fri Jan 22 01:28:00 2010
@@ -25,7 +25,10 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import org.apache.avro.AvroTestUtil;
import org.apache.avro.Schema;
@@ -93,16 +96,30 @@
}
@Test
+ public void testWriteWithDeflate() throws Exception {
+ testWrite("deflate", Arrays.asList("--codec", "deflate"), "deflate");
+ }
+
+ @Test
public void testWrite() throws Exception {
+ testWrite("plain", Collections.<String>emptyList(), "null");
+ }
+
+ public void testWrite(String name, List<String> extra, String expectedCodec)
+ throws Exception {
File outFile = AvroTestUtil.tempFile(
- TestDataFileTools.class + ".testWrite." + ".avro");
+ TestDataFileTools.class + ".testWrite." + name + ".avro");
FileOutputStream fout = new FileOutputStream(outFile);
PrintStream out = new PrintStream(fout);
+ List<String> args = new ArrayList<String>();
+ args.add(schema.toString());
+ args.add("-");
+ args.addAll(extra);
new DataFileWriteTool().run(
new StringInputStream(jsonData),
new PrintStream(out), // stdout
null, // stderr
- Arrays.asList(schema.toString(), "-"));
+ args);
out.close();
fout.close();
@@ -116,6 +133,7 @@
}
assertEquals(COUNT, i);
assertEquals(schema, fileReader.getSchema());
+ assertEquals(expectedCodec, fileReader.getMetaString("codec"));
}
@Test
@@ -149,7 +167,7 @@
DataFileReader<Object> fileReader =
new DataFileReader<Object>(outFile,reader);
int i = 0;
- for (Object datum : fileReader) {
+ for (@SuppressWarnings("unused") Object datum : fileReader) {
i++;
}
return i;