You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ph...@apache.org on 2010/01/22 02:28:02 UTC

svn commit: r901959 - in /hadoop/avro/trunk: ./ doc/src/content/xdocs/ lang/java/ lang/java/src/java/org/apache/avro/file/ lang/java/src/java/org/apache/avro/tool/ lang/java/src/test/java/org/apache/avro/ lang/java/src/test/java/org/apache/avro/file/ l...

Author: philz
Date: Fri Jan 22 01:28:00 2010
New Revision: 901959

URL: http://svn.apache.org/viewvc?rev=901959&view=rev
Log:
AVRO-135. Add compression to data files. (philz)

Added:
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/LengthLimitedInputStream.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java
    hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileDeflate.java
    hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/file/
    hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/file/TestLengthLimitedInputStream.java
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/doc/src/content/xdocs/spec.xml
    hadoop/avro/trunk/lang/java/build.xml
    hadoop/avro/trunk/lang/java/ivy.xml
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileWriteTool.java
    hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java
    hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Fri Jan 22 01:28:00 2010
@@ -69,6 +69,8 @@
     AVRO-346. Add function to validate a datum against a schema. (massie)
 
     AVRO-306. Add Ruby implementation. (Jeff Hodges via cutting)
+    
+    AVRO-135. Add compression to data files. (philz)
 
   IMPROVEMENTS
 

Modified: hadoop/avro/trunk/doc/src/content/xdocs/spec.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/doc/src/content/xdocs/spec.xml?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/doc/src/content/xdocs/spec.xml (original)
+++ hadoop/avro/trunk/doc/src/content/xdocs/spec.xml Fri Jan 22 01:28:00 2010
@@ -624,10 +624,10 @@
 	<li><strong>schema</strong> contains the schema of objects
 	stored in the file, as JSON data (required).</li>
 	<li><strong>codec</strong> the name of the compression codec
-	used to compress blocks, as a string. The only value for codec
-	currently supported is "null" (meaning no compression is
-	performed).  If codec is absent, it is assumed to be
-	"null".</li>
+	used to compress blocks, as a string.  Implementations
+        are required to support the following codecs: "null" and "deflate".  
+        If codec is absent, it is assumed to be "null".  The codecs
+        are described with more detail below.</li>
       </ul>
 
       <p>A file header is thus described by the following schema:</p>
@@ -649,6 +649,26 @@
 	<li>The file's 16-byte sync marker.</li>
       </ul>
 
+      <section>
+      <title>Required Codecs</title>
+        <section>
+        <title>null</title>
+        <p>The "null" codec simply passes through data uncompressed.</p>
+        </section>
+
+        <section>
+        <title>deflate</title>
+        <p>The "deflate" codec writes the length of the compressed
+        data (as an Avro-encoded long), followed by data compressed using the
+        deflate algorithm as specified in 
+        <a href="http://www.isi.edu/in-notes/rfc1951.txt">RFC 1951</a>,
+        and typically implemented using the zlib library.  Note that this
+        format (unlike the "zlib format" in RFC 1950) does not have a
+        checksum.
+        </p>
+        </section>
+      </section>
+
     </section>
 
     <section>

Modified: hadoop/avro/trunk/lang/java/build.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/build.xml?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/build.xml (original)
+++ hadoop/avro/trunk/lang/java/build.xml Fri Jan 22 01:28:00 2010
@@ -250,7 +250,7 @@
     </checkstyle>
   </target>
 
-  <target name="compile-test-java" depends="ivy-retrieve-test,compile">
+  <target name="compile-test-java" depends="ivy-retrieve-test,ivy-retrieve-tools,compile">
     <java-avro-compiler src="${test.schemata.dir}"
 			generated="${test.java.generated.dir}"
 			dest="${test.java.generated.classes}"

Modified: hadoop/avro/trunk/lang/java/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/ivy.xml?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/ivy.xml (original)
+++ hadoop/avro/trunk/lang/java/ivy.xml Fri Jan 22 01:28:00 2010
@@ -59,6 +59,8 @@
     <dependency org="commons-lang" name="commons-lang" rev="2.4" />
     <dependency org="org.apache.maven" name="maven-ant-tasks" rev="2.0.9"
         conf="build->default"/>
+    <dependency org="net.sf.jopt-simple" name="jopt-simple" rev="3.2"
+        conf="test->default;tools->default"/>
   </dependencies>
 
 </ivy-module>

Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java (added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.avro.io.Decoder;
+
+/** 
+ * Interface for Avro-supported compression codecs for data files.
+ *
+ * This is currently exclusively an internal-facing API.
+ */
+abstract class Codec {
+  /** Name of the codec; written to the file's metadata. */
+  abstract String getName();
+  /** Compresses the input data into out. */
+  abstract void compress(ByteArrayOutputStream data, OutputStream out) throws IOException;
+  /** Returns a decoder on the uncompressed stream. */
+  abstract Decoder decompress(InputStream in, Decoder vin) throws IOException;
+}

Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java (added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.zip.Deflater;
+
+import org.apache.avro.AvroRuntimeException;
+
+/** Encapsulates the ability to specify and configure a compression codec. */
+public abstract class CodecFactory {
+  /** Null codec, for no compression. */
+  public static CodecFactory nullCodec() { 
+    return NullCodec.OPTION; 
+  };
+  
+  /** Deflate codec, with specific compression.
+   * compressionLevel should be between 1 and 9, inclusive. */
+  public static CodecFactory deflateCodec(int compressionLevel) { 
+    return new DeflateCodec.Option(compressionLevel); 
+  };
+  
+  /** Creates internal Codec. */
+  protected abstract Codec createInstance();
+  
+  /** Mapping of string names (stored as metas) and codecs. 
+   * Note that currently options (like compression level)
+   * are not recoverable. */
+  private static final Map<String, CodecFactory> REGISTERED = 
+    new HashMap<String, CodecFactory>();
+
+  private static final int DEFAULT_DEFLATE_LEVEL = Deflater.DEFAULT_COMPRESSION;
+
+  static {
+    addCodec("null", nullCodec());
+    addCodec("deflate", deflateCodec(DEFAULT_DEFLATE_LEVEL));
+  }
+
+  /** Maps a codec name into a CodecOption. */
+  public static CodecFactory fromString(String s) {
+    CodecFactory o = REGISTERED.get(s);
+    if (o == null) {
+      throw new AvroRuntimeException("Unrecognized codec: " + s);
+    }
+    return o;
+  }
+  
+  /** Adds a new codec implementation.  If name already had
+   * a codec associated with it, returns the previous codec. */
+  public static CodecFactory addCodec(String name, CodecFactory c) {
+    return REGISTERED.put(name, c);
+  }
+  
+}

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java Fri Jan 22 01:28:00 2010
@@ -22,17 +22,17 @@
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
-import org.apache.avro.Schema;
 import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
-import org.apache.avro.io.BinaryDecoder;
 
 /** Streaming access to files written by {@link DataFileWriter}.  Use {@link
  * DataFileReader} for file-based input.
@@ -43,14 +43,20 @@
   private Schema schema;
   private DatumReader<D> reader;
 
+  /** Raw input stream. */
   final InputStream in;
+  /** Decoder on raw input stream.  (Used for metadata.) */
   final Decoder vin;
+  /** Secondary decoder, for datums.
+   *  (Different than vin for compressed segments.) */
+  Decoder datumIn = null;
 
   Map<String,byte[]> meta = new HashMap<String,byte[]>();
 
   long blockRemaining;                          // # entries remaining in block
   byte[] sync = new byte[DataFileConstants.SYNC_SIZE];
   byte[] syncBuffer = new byte[DataFileConstants.SYNC_SIZE];
+  private Codec codec;
 
   /** Construct a reader for an input stream.  For file-based input, use {@link
    * DataFileReader}.  This performs no buffering, for good performance, be
@@ -83,19 +89,25 @@
     }
     vin.readFixed(sync);                          // read sync
 
-    String codec = getMetaString(DataFileConstants.CODEC);
-    if (codec != null && ! codec.equals(DataFileConstants.NULL_CODEC)) {
-      throw new IOException("Unknown codec: " + codec);
-    }
+    this.codec = resolveCodec();
     this.schema = Schema.parse(getMetaString(DataFileConstants.SCHEMA));
     this.reader = reader;
 
     reader.setSchema(schema);
   }
-  
+
+  private Codec resolveCodec() {
+    String codecStr = getMetaString(DataFileConstants.CODEC);
+    if (codecStr != null) {
+      return CodecFactory.fromString(codecStr).createInstance();
+    } else {
+      return CodecFactory.nullCodec().createInstance();
+    }
+  }
+
   /** Return the schema used in this file. */
   public Schema getSchema() { return schema; }
-  
+
   /** Return the value of a metadata property. */
   public byte[] getMeta(String key) {
     return meta.get(key);
@@ -125,8 +137,10 @@
   /** True if more entries remain in this file. */
   public boolean hasNext() {
     try {
-      if (blockRemaining == 0)
+      if (blockRemaining == 0) {
         blockRemaining = vin.readLong();          // read block count
+        datumIn = codec.decompress(in, vin);
+      }
       return blockRemaining != 0;
     } catch (EOFException e) {                    // at EOF
       return false;
@@ -153,7 +167,7 @@
   public D next(D reuse) throws IOException {
     if (!hasNext())
       throw new NoSuchElementException();
-    D result = reader.read(reuse, vin);
+    D result = reader.read(reuse, datumIn);
     if (--blockRemaining == 0)
       skipSync();
     return result;

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java Fri Jan 22 01:28:00 2010
@@ -20,29 +20,30 @@
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.FilterOutputStream;
 import java.io.Flushable;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-import java.io.File;
-import java.io.FileDescriptor;
-import java.io.FileOutputStream;
-import java.io.FileNotFoundException;
 import java.io.RandomAccessFile;
+import java.io.UnsupportedEncodingException;
 import java.nio.channels.FileChannel;
 import java.rmi.server.UID;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
-import org.apache.avro.Schema;
 import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Encoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.generic.GenericDatumReader;
 
 /** Stores in a file a sequence of data conforming to a schema.  The schema is
  * stored in the file with the data.  Each datum in a file is of the same
@@ -70,6 +71,13 @@
   private int syncInterval = DataFileConstants.DEFAULT_SYNC_INTERVAL;
 
   private boolean isOpen;
+  private Codec codec;
+  
+  private static final HashSet<String> RESERVED_META = new HashSet<String>();
+  static {
+    RESERVED_META.add("codec");
+    RESERVED_META.add("schema");
+  }
 
   /** Construct a writer, not yet open. */
   public DataFileWriter(DatumWriter<D> dout) {
@@ -82,6 +90,17 @@
   private void assertNotOpen() {
     if (isOpen) throw new AvroRuntimeException("already open");
   }
+  
+  /** 
+   * Configures this writer to use the given codec. 
+   * May not be reset after writes have begun.
+   */
+  public DataFileWriter<D> setCodec(CodecFactory c) {
+    assertNotOpen();
+    this.codec = c.createInstance();
+    setMetaInternal("codec", codec.getName());
+    return this;
+  }
 
   /** Set the synchronization interval for this file, in bytes. */
   public DataFileWriter<D> setSyncInterval(int syncInterval) {
@@ -100,7 +119,7 @@
     assertNotOpen();
 
     this.schema = schema;
-    setMeta(DataFileConstants.SCHEMA, schema.toString());
+    setMetaInternal(DataFileConstants.SCHEMA, schema.toString());
     this.sync = generateSync();
 
     init(outs);
@@ -134,6 +153,13 @@
     this.schema = reader.getSchema();
     this.sync = reader.sync;
     this.meta.putAll(reader.meta);
+    byte[] codecBytes = this.meta.get("codec");
+    if (codecBytes != null) {
+      String strCodec = new String(codecBytes, "UTF-8");
+      this.codec = CodecFactory.fromString(strCodec).createInstance();
+    } else {
+      this.codec = CodecFactory.nullCodec().createInstance();
+    }
 
     FileChannel channel = raf.getChannel();       // seek to end
     channel.position(channel.size());
@@ -144,11 +170,14 @@
   }
 
   private void init(OutputStream outs) throws IOException {
-    this.buffer = new ByteArrayOutputStream(syncInterval*2);
-    this.bufOut = new BinaryEncoder(buffer);
     this.out = new BufferedFileOutputStream(outs);
     this.vout = new BinaryEncoder(out);
     dout.setSchema(schema);
+    this.buffer = new ByteArrayOutputStream(syncInterval*2);
+    if (this.codec == null) {
+      this.codec = CodecFactory.nullCodec().createInstance();
+    }
+    this.bufOut = new BinaryEncoder(buffer);
     this.isOpen = true;
   }
 
@@ -163,12 +192,28 @@
     }
   }
 
-  /** Set a metadata property. */
-  public DataFileWriter<D> setMeta(String key, byte[] value) {
+  private DataFileWriter<D> setMetaInternal(String key, byte[] value) {
     assertNotOpen();
     meta.put(key, value);
     return this;
   }
+  
+  public DataFileWriter<D> setMetaInternal(String key, String value) {
+    try {
+      return setMetaInternal(key, value.getBytes("UTF-8"));
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Set a metadata property. */
+  public DataFileWriter<D> setMeta(String key, byte[] value) {
+    if (RESERVED_META.contains(key)) {
+      throw new AvroRuntimeException("Cannot set reserved meta key: " + key);
+    }
+    return setMetaInternal(key, value);
+  }
+  
   /** Set a metadata property. */
   public DataFileWriter<D> setMeta(String key, String value) {
     try {
@@ -194,7 +239,7 @@
   private void writeBlock() throws IOException {
     if (blockCount > 0) {
       vout.writeLong(blockCount);
-      buffer.writeTo(out);
+      codec.compress(buffer, out);
       buffer.reset();
       blockCount = 0;
       out.write(sync);

Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java (added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
+
+/** 
+ * Implements DEFLATE (RFC1951) compression and decompression. 
+ *
+ * Note that there is a distinction between RFC1951 (deflate)
+ * and RFC1950 (zlib).  zlib adds an extra 2-byte header
+ * at the front, and a 4-byte checksum at the end.  The
+ * code here, by passing "true" as the "nowrap" option to
+ * {@link Inflater} and {@link Deflater}, is using
+ * RFC1951.
+ */
+class DeflateCodec extends Codec {
+  
+  static class Option extends CodecFactory {
+    private int compressionLevel;
+
+    public Option(int compressionLevel) {
+      this.compressionLevel = compressionLevel;
+    }
+
+    @Override
+    protected Codec createInstance() {
+      return new DeflateCodec(compressionLevel);
+    }
+  }
+
+  ByteArrayOutputStream compressionBuffer;
+  private Deflater deflater;
+  private int compressionLevel;
+  private Inflater inflater;
+
+  public DeflateCodec(int compressionLevel) {
+    this.compressionLevel = compressionLevel;
+  }
+
+  @Override
+  String getName() {
+    return "deflate";
+  }
+
+  @Override
+  void compress(ByteArrayOutputStream buffer, OutputStream out) throws IOException {
+    if (compressionBuffer == null) {
+      compressionBuffer = new ByteArrayOutputStream(buffer.size());
+    }
+    if (deflater == null) {
+      deflater = new Deflater(compressionLevel, false);
+    }
+    // Pass output through deflate, and prepend with length of compressed output.
+    DeflaterOutputStream deflaterStream = 
+      new DeflaterOutputStream(compressionBuffer, deflater);
+    buffer.writeTo(deflaterStream);
+    deflaterStream.finish();
+    new BinaryEncoder(out).writeLong(compressionBuffer.size());
+    compressionBuffer.writeTo(out);
+    compressionBuffer.reset();
+    deflater.reset();
+  }
+
+  @Override
+  Decoder decompress(InputStream in, Decoder vin) throws IOException {
+    if (inflater == null) {
+      inflater = new Inflater(false);
+    }
+    long compressedLength = vin.readLong();
+    InputStream uncompressed = new InflaterInputStream(
+        new LengthLimitedInputStream(in, compressedLength),
+        inflater);
+    inflater.reset();
+    return new BinaryDecoder(uncompressed);
+  }
+
+}

Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/LengthLimitedInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/LengthLimitedInputStream.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/LengthLimitedInputStream.java (added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/LengthLimitedInputStream.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/** Represents a substream of certain length. */
+class LengthLimitedInputStream extends FilterInputStream {
+
+  /** Bytes remaining. */
+  private long remaining;
+
+  protected LengthLimitedInputStream(InputStream in, long maxLength) {
+    super(in);
+    remaining = maxLength;
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (remaining > 0) {
+      int v = super.read();
+      if (v != -1) {
+        remaining--;
+      }
+      return v;
+    }
+    return -1;
+  }
+
+  @Override
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+
+  /**
+   * Returns at most Integer.MAX_VALUE.
+   */
+  private int remainingInt() {
+    return (int)Math.min(remaining, Integer.MAX_VALUE);
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if (remaining == 0) {
+      return -1;
+    }
+    if (len > remaining) {
+      len = remainingInt();
+    }
+    int v = super.read(b, off, len);
+    if (v != -1) {
+      remaining -= v;
+    }
+    return v;
+  }
+
+  @Override
+  public int available() throws IOException {
+    return Math.min(super.available(), remainingInt());
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    long v = super.skip(Math.min(remaining, n));
+    remaining -= v;
+    return v;
+  }
+}

Added: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java (added)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.avro.io.Decoder;
+
+/** Implements "null" (pass through) codec. */
+final class NullCodec extends Codec {
+  
+  private static final NullCodec INSTANCE = new NullCodec();
+
+  static class Option extends CodecFactory {
+
+    @Override
+    protected Codec createInstance() {
+      return INSTANCE;
+    }
+    
+  }
+  /** No options available for NullCodec. */
+  public static final CodecFactory OPTION = new Option();
+
+  @Override
+  String getName() {
+    return "null";
+  }
+
+  @Override
+  void compress(ByteArrayOutputStream buffer, OutputStream out) throws IOException {
+    buffer.writeTo(out);
+  }
+  
+  @Override
+  Decoder decompress(InputStream in, Decoder vin) {
+    return vin;
+  }
+}

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileWriteTool.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileWriteTool.java?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileWriteTool.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileWriteTool.java Fri Jan 22 01:28:00 2010
@@ -23,7 +23,12 @@
 import java.io.PrintStream;
 import java.util.List;
 
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+
 import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.generic.GenericDatumReader;
@@ -47,20 +52,31 @@
   @Override
   public int run(InputStream stdin, PrintStream out, PrintStream err,
       List<String> args) throws Exception {
-    if (args.size() != 2) {
+
+    OptionParser p = new OptionParser();
+    OptionSpec<String> codec =
+      p.accepts("codec", "Compression codec")
+      .withRequiredArg()
+      .defaultsTo("null")
+      .ofType(String.class);
+    OptionSet opts = p.parse(args.toArray(new String[0]));
+
+    if (opts.nonOptionArguments().size() != 2) {
       err.println("Expected 2 args: schema input_file");
+      p.printHelpOn(err);
       return 1;
     }
-    
+
     Schema schema = Schema.parse(args.get(0));
     DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
-    
+
     InputStream input = Util.fileOrStdin(args.get(1), stdin);
     try {
       DataInputStream din = new DataInputStream(input);
       DataFileWriter<Object> writer =
-        new DataFileWriter<Object>(new GenericDatumWriter<Object>())
-        .create(schema, out);
+        new DataFileWriter<Object>(new GenericDatumWriter<Object>());
+      writer.setCodec(CodecFactory.fromString(codec.value(opts)));
+      writer.create(schema, out);
       Decoder decoder = new JsonDecoder(schema, din);
       Object datum;
       while (true) {

Modified: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java Fri Jan 22 01:28:00 2010
@@ -17,6 +17,16 @@
  */
 package org.apache.avro;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumReader;
@@ -24,23 +34,41 @@
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.util.Random;
-import java.io.File;
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+@RunWith(Parameterized.class)
 public class TestDataFile {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestDataFile.class);
+
+  CodecFactory codec = null;
+  public TestDataFile(CodecFactory codec) {
+    this.codec = codec;
+    LOG.info("Running with codec: " + codec);
+  }
+
+  @Parameters
+  public static List<Object[]> codecs() {
+    List<Object[]> r = new ArrayList<Object[]>();
+    r.add(new Object[] { null });
+    r.add(new Object[] { CodecFactory.deflateCodec(1) });
+    r.add(new Object[] { CodecFactory.deflateCodec(9) });
+    r.add(new Object[] { CodecFactory.nullCodec() });
+    return r;
+  }
+
   private static final int COUNT =
     Integer.parseInt(System.getProperty("test.count", "10"));
-  private static final boolean VALIDATE = 
+  private static final boolean VALIDATE =
     !"false".equals(System.getProperty("test.validate", "true"));
   private static final File DIR
     = new File(System.getProperty("test.dir", "/tmp"));
   private static final File DATAFILE_DIR
     = new File(System.getProperty("test.dir", "/tmp"));
-  private static final File FILE = new File(DIR, "test.avro");
   private static final long SEED = System.currentTimeMillis();
 
   private static final String SCHEMA_JSON =
@@ -49,12 +77,19 @@
     +"{\"name\":\"longField\", \"type\":\"long\"}]}";
   private static final Schema SCHEMA = Schema.parse(SCHEMA_JSON);
 
+  private File makeFile() {
+    return new File(DIR, "test-" + codec + ".avro");
+  }
+
   @Test
   public void testGenericWrite() throws IOException {
     DataFileWriter<Object> writer =
       new DataFileWriter<Object>(new GenericDatumWriter<Object>())
-      .setSyncInterval(100)
-      .create(SCHEMA, FILE);
+      .setSyncInterval(100);
+    if (codec != null) {
+      writer.setCodec(codec);
+    }
+    writer.create(SCHEMA, makeFile());
     try {
       int count = 0;
       for (Object datum : new RandomData(SCHEMA, COUNT, SEED)) {
@@ -70,7 +105,7 @@
   @Test
   public void testGenericRead() throws IOException {
     DataFileReader<Object> reader =
-      new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
+      new DataFileReader<Object>(makeFile(), new GenericDatumReader<Object>());
     try {
       Object datum = null;
       if (VALIDATE) {
@@ -90,12 +125,13 @@
 
   @Test
   public void testSplits() throws IOException {
+    File file = makeFile();
     DataFileReader<Object> reader =
-      new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
+      new DataFileReader<Object>(file, new GenericDatumReader<Object>());
     Random rand = new Random(SEED);
     try {
       int splits = 10;                            // number of splits
-      int length = (int)FILE.length();            // length of file
+      int length = (int)file.length();            // length of file
       int end = length;                           // end of split
       int remaining = end;                        // bytes remaining
       int count = 0;                              // count of entries
@@ -117,10 +153,11 @@
 
   @Test
   public void testGenericAppend() throws IOException {
-    long start = FILE.length();
+    File file = makeFile();
+    long start = file.length();
     DataFileWriter<Object> writer =
       new DataFileWriter<Object>(new GenericDatumWriter<Object>())
-      .appendTo(FILE);
+      .appendTo(file);
     try {
       for (Object datum : new RandomData(SCHEMA, COUNT, SEED+1)) {
         writer.append(datum);
@@ -129,7 +166,7 @@
       writer.close();
     }
     DataFileReader<Object> reader =
-      new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
+      new DataFileReader<Object>(file, new GenericDatumReader<Object>());
     try {
       reader.seek(start);
       Object datum = null;
@@ -162,7 +199,7 @@
     Schema projection = null;
     if (args.length > 1)
       projection = Schema.parse(new File(args[1]));
-    TestDataFile tester = new TestDataFile();
+    TestDataFile tester = new TestDataFile(null);
     tester.readFile(input, new GenericDatumReader<Object>(null, projection));
     long start = System.currentTimeMillis();
     for (int i = 0; i < 4; i++)
@@ -195,7 +232,7 @@
   //   }
 
     private void readFiles(DatumReader<Object> datumReader) throws IOException {
-      TestDataFile test = new TestDataFile();
+      TestDataFile test = new TestDataFile(null);
       for (File f : DATAFILE_DIR.listFiles())
         test.readFile(f, datumReader);
     }

Added: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileDeflate.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileDeflate.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileDeflate.java (added)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileDeflate.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+/** Simple test of DataFileWriter and DataFileStream with deflate codec. */
+public class TestDataFileDeflate {
+  @Test
+  public void testWriteAndRead() throws IOException {
+    Schema schema = Schema.create(Type.STRING);
+
+    // Write it
+    DataFileWriter<Utf8> w = new DataFileWriter<Utf8>(new GenericDatumWriter<Utf8>(schema));
+    w.setCodec(CodecFactory.deflateCodec(6));
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    w.create(schema, baos);
+    w.append(new Utf8("hello world"));
+    w.append(new Utf8("hello moon"));
+    w.sync();
+    w.append(new Utf8("bye bye world"));
+    w.append(new Utf8("bye bye moon"));
+    w.close();
+
+    // Read it
+    DataFileStream<Utf8> r = new DataFileStream<Utf8>(
+        new ByteArrayInputStream(baos.toByteArray()),
+        new GenericDatumReader<Utf8>(schema));
+    assertEquals("hello world", r.next().toString());
+    assertEquals("hello moon", r.next().toString());
+    assertEquals("bye bye world", r.next().toString());
+    assertEquals("bye bye moon", r.next().toString());
+    assertFalse(r.hasNext());
+  }
+}

Added: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/file/TestLengthLimitedInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/file/TestLengthLimitedInputStream.java?rev=901959&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/file/TestLengthLimitedInputStream.java (added)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/file/TestLengthLimitedInputStream.java Fri Jan 22 01:28:00 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLengthLimitedInputStream {
+  InputStream raw;
+
+  @Before
+  public void setupRawStream() {
+    byte[] buf = new byte[128];
+    for (int i = 0; i < 128; ++i) {
+      buf[i] = (byte)i;
+    }
+    raw = new ByteArrayInputStream(buf);
+  }
+
+  @Test
+  public void testAvailable() throws IOException {
+    InputStream is = new LengthLimitedInputStream(raw, 10);
+    assertEquals(10, is.available());
+    is.skip(100);
+    assertEquals(0, is.available());
+  }
+
+  @Test
+  public void testRead() throws IOException {
+    InputStream is = new LengthLimitedInputStream(raw, 10);
+    byte[] x = new byte[12];
+    assertEquals(0, is.read());
+    assertEquals(9, is.read(x));
+    assertEquals(-1, is.read(x));
+    assertEquals(x[8], 9);
+  }
+}

Modified: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java?rev=901959&r1=901958&r2=901959&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java (original)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java Fri Jan 22 01:28:00 2010
@@ -25,7 +25,10 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.avro.AvroTestUtil;
 import org.apache.avro.Schema;
@@ -93,16 +96,30 @@
   }
   
   @Test
+  public void testWriteWithDeflate() throws Exception {
+    testWrite("deflate", Arrays.asList("--codec", "deflate"), "deflate");
+  }
+  
+  @Test
   public void testWrite() throws Exception {
+    testWrite("plain", Collections.<String>emptyList(), "null");
+  }
+  
+  public void testWrite(String name, List<String> extra, String expectedCodec) 
+      throws Exception {
     File outFile = AvroTestUtil.tempFile(
-        TestDataFileTools.class + ".testWrite." + ".avro");
+        TestDataFileTools.class + ".testWrite." + name + ".avro");
     FileOutputStream fout = new FileOutputStream(outFile);
     PrintStream out = new PrintStream(fout);
+    List<String> args = new ArrayList<String>();
+    args.add(schema.toString());
+    args.add("-");
+    args.addAll(extra);
     new DataFileWriteTool().run(
         new StringInputStream(jsonData),
         new PrintStream(out), // stdout
         null, // stderr
-        Arrays.asList(schema.toString(), "-"));
+        args);
     out.close();
     fout.close();
     
@@ -116,6 +133,7 @@
     }
     assertEquals(COUNT, i);
     assertEquals(schema, fileReader.getSchema());
+    assertEquals(expectedCodec, fileReader.getMetaString("codec"));
   }
   
   @Test
@@ -149,7 +167,7 @@
     DataFileReader<Object> fileReader = 
       new DataFileReader<Object>(outFile,reader);
     int i = 0;
-    for (Object datum : fileReader) {
+    for (@SuppressWarnings("unused") Object datum : fileReader) {
       i++;
     }
     return i;