You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ph...@apache.org on 2010/01/29 20:29:19 UTC

svn commit: r904604 - in /hadoop/avro/trunk: ./ doc/src/content/xdocs/ lang/java/src/java/org/apache/avro/file/ lang/java/src/test/java/org/apache/avro/ lang/java/src/test/java/org/apache/avro/tool/

Author: philz
Date: Fri Jan 29 19:29:17 2010
New Revision: 904604

URL: http://svn.apache.org/viewvc?rev=904604&view=rev
Log:
AVRO-380.  Avro Container File format change: add block size to block descriptor.  Contributed by Scott Carey.

Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/doc/src/content/xdocs/spec.xml
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java
    hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java
    hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileMeta.java
    hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Fri Jan 29 19:29:17 2010
@@ -79,6 +79,9 @@
     AVRO-368. Reserve avro.* in object container files, and 
     rename existing reserved words. (philz)
 
+    AVRO-380. Avro Container File format change: add block size to block
+    descriptor.  (Scott Carey via philz)
+
   IMPROVEMENTS
 
     AVRO-157. Changes from code review comments for C++. (sbanacho)

Modified: hadoop/avro/trunk/doc/src/content/xdocs/spec.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/doc/src/content/xdocs/spec.xml?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/doc/src/content/xdocs/spec.xml (original)
+++ hadoop/avro/trunk/doc/src/content/xdocs/spec.xml Fri Jan 29 19:29:17 2010
@@ -645,11 +645,15 @@
       <p>A file data block consists of:</p>
       <ul>
 	<li>A long indicating the count of objects in this block.</li>
+	<li>A long indicating the size in bytes of the serialized objects
+	in the current block, after any codec is applied</li>
 	<li>The serialized objects.  If a codec is specified, this is
 	compressed by that codec.</li>
 	<li>The file's 16-byte sync marker.</li>
       </ul>
-
+	  <p>Thus, each block's binary data can be efficiently extracted or skipped without
+	  deserializing the contents.  The combination of block size, object counts, and 
+	  sync markers enable detection of corrupt blocks and help ensure data integrity.</p>
       <section>
       <title>Required Codecs</title>
         <section>

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java Fri Jan 29 19:29:17 2010
@@ -19,8 +19,6 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 import org.apache.avro.io.Decoder;
 
@@ -32,8 +30,8 @@
 abstract class Codec {
   /** Name of the codec; written to the file's metadata. */
   abstract String getName();
-  /** Compresses the input data into out. */
-  abstract void compress(ByteArrayOutputStream data, OutputStream out) throws IOException;
-  /** Returns a decoder on the uncompressed stream. */
-  abstract Decoder decompress(InputStream in, Decoder vin) throws IOException;
+  /** Compresses the input data and return the result as a ByteArrayOutputStream */
+  abstract ByteArrayOutputStream compress(ByteArrayOutputStream data) throws IOException;
+  /** Returns a decoder on the uncompressed data. */
+  abstract Decoder decompress(byte[] compressedData) throws IOException;
 }

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java Fri Jan 29 19:29:17 2010
@@ -48,7 +48,7 @@
   /** Decoder on raw input stream.  (Used for metadata.) */
   final Decoder vin;
   /** Secondary decoder, for datums.
-   *  (Different than vin for compressed segments.) */
+   *  (Different than vin for block segments.) */
   Decoder datumIn = null;
 
   Map<String,byte[]> meta = new HashMap<String,byte[]>();
@@ -138,8 +138,29 @@
   public boolean hasNext() {
     try {
       if (blockRemaining == 0) {
+        // check that the previous block was finished
+        // clunky and inefficient with the current Decoder API
+        // the only way to detect end of stream is with EOFException
+        if (null != datumIn) {
+          try {
+            datumIn.readBoolean();
+            throw new IOException("Block read partially, the data may be corrupt");
+          } catch (EOFException eof) {
+            // this indicates the block is at its end as we expect
+          }
+        }
         blockRemaining = vin.readLong();          // read block count
-        datumIn = codec.decompress(in, vin);
+        long compressedSize = vin.readLong(); // read block size
+        if (compressedSize > Integer.MAX_VALUE) {
+          throw new IOException("Block size too large: " + compressedSize);
+        }
+        byte[] block = new byte[(int)compressedSize];
+        // if vin buffers in, the below will needs to handle it
+        int sizeRead = in.read(block); 
+        if (sizeRead != compressedSize) {
+          throw new IOException("Incomplete Block");
+        }
+        datumIn = codec.decompress(block);
       }
       return blockRemaining != 0;
     } catch (EOFException e) {                    // at EOF

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java Fri Jan 29 19:29:17 2010
@@ -61,7 +61,7 @@
 
   private final Map<String,byte[]> meta = new HashMap<String,byte[]>();
 
-  private int blockCount;                       // # entries in current block
+  private long blockCount;                       // # entries in current block
 
   private ByteArrayOutputStream buffer;
   private Encoder bufOut;
@@ -95,8 +95,22 @@
     return this;
   }
 
-  /** Set the synchronization interval for this file, in bytes. */
+  /**
+   * Set the synchronization interval for this file, in bytes. 
+   * Valid values range from 32 to 2^30
+   * Suggested values are between 2K and 2M
+   * 
+   * Invalid values throw IllegalArgumentException
+   * 
+   * @param syncInterval 
+   *   the approximate number of uncompressed bytes to write in each block
+   * @return 
+   *   this DataFileWriter
+   */
   public DataFileWriter<D> setSyncInterval(int syncInterval) {
+    if (syncInterval < 32 || syncInterval > (1 << 30)) {
+      throw new IllegalArgumentException("Invalid syncInterval value: " + syncInterval);
+    }
     this.syncInterval = syncInterval;
     return this;
   }
@@ -127,6 +141,7 @@
       vout.writeBytes(entry.getValue());
     }
     vout.writeMapEnd();
+    vout.flush(); //vout may be buffered, flush before writing to out
 
     out.write(sync);                              // write initial sync
 
@@ -166,7 +181,8 @@
     this.out = new BufferedFileOutputStream(outs);
     this.vout = new BinaryEncoder(out);
     dout.setSchema(schema);
-    this.buffer = new ByteArrayOutputStream(syncInterval*2);
+    this.buffer = new ByteArrayOutputStream(
+        Math.min((int)(syncInterval * 1.25), Integer.MAX_VALUE/2 -1));
     if (this.codec == null) {
       this.codec = CodecFactory.nullCodec().createInstance();
     }
@@ -236,7 +252,10 @@
   private void writeBlock() throws IOException {
     if (blockCount > 0) {
       vout.writeLong(blockCount);
-      codec.compress(buffer, out);
+      ByteArrayOutputStream block = codec.compress(buffer);
+      vout.writeLong(block.size());
+      vout.flush(); //vout may be buffered, flush before writing to out
+      block.writeTo(out);
       buffer.reset();
       blockCount = 0;
       out.write(sync);
@@ -285,4 +304,3 @@
   }
 
 }
-

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java Fri Jan 29 19:29:17 2010
@@ -17,17 +17,16 @@
  */
 package org.apache.avro.file;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.Inflater;
 import java.util.zip.InflaterInputStream;
 
 import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.Decoder;
 
 /** 
@@ -58,7 +57,6 @@
   ByteArrayOutputStream compressionBuffer;
   private Deflater deflater;
   private int compressionLevel;
-  private Inflater inflater;
 
   public DeflateCodec(int compressionLevel) {
     this.compressionLevel = compressionLevel;
@@ -70,34 +68,30 @@
   }
 
   @Override
-  void compress(ByteArrayOutputStream buffer, OutputStream out) throws IOException {
+  ByteArrayOutputStream compress(ByteArrayOutputStream buffer) 
+    throws IOException {
     if (compressionBuffer == null) {
       compressionBuffer = new ByteArrayOutputStream(buffer.size());
+    } else {
+      compressionBuffer.reset();
     }
     if (deflater == null) {
-      deflater = new Deflater(compressionLevel, false);
+      deflater = new Deflater(compressionLevel, true);
     }
-    // Pass output through deflate, and prepend with length of compressed output.
+    // Pass output through deflate
     DeflaterOutputStream deflaterStream = 
       new DeflaterOutputStream(compressionBuffer, deflater);
     buffer.writeTo(deflaterStream);
     deflaterStream.finish();
-    new BinaryEncoder(out).writeLong(compressionBuffer.size());
-    compressionBuffer.writeTo(out);
-    compressionBuffer.reset();
     deflater.reset();
+    return compressionBuffer;
   }
 
   @Override
-  Decoder decompress(InputStream in, Decoder vin) throws IOException {
-    if (inflater == null) {
-      inflater = new Inflater(false);
-    }
-    long compressedLength = vin.readLong();
+  Decoder decompress(byte[] in) throws IOException {
+    Inflater inflater = new Inflater(true);
     InputStream uncompressed = new InflaterInputStream(
-        new LengthLimitedInputStream(in, compressedLength),
-        inflater);
-    inflater.reset();
+        new ByteArrayInputStream(in), inflater);
     return new BinaryDecoder(uncompressed);
   }
 

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java Fri Jan 29 19:29:17 2010
@@ -17,11 +17,11 @@
  */
 package org.apache.avro.file;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
+import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.Decoder;
 
 /** Implements "null" (pass through) codec. */
@@ -46,12 +46,12 @@
   }
 
   @Override
-  void compress(ByteArrayOutputStream buffer, OutputStream out) throws IOException {
-    buffer.writeTo(out);
+  ByteArrayOutputStream compress(ByteArrayOutputStream buffer) throws IOException {
+    return buffer;
   }
   
   @Override
-  Decoder decompress(InputStream in, Decoder vin) {
-    return vin;
+  Decoder decompress(byte[] in) throws IOException {
+    return new BinaryDecoder(new ByteArrayInputStream(in));
   }
 }

Modified: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java Fri Jan 29 19:29:17 2010
@@ -55,6 +55,7 @@
   public static List<Object[]> codecs() {
     List<Object[]> r = new ArrayList<Object[]>();
     r.add(new Object[] { null });
+    r.add(new Object[] { CodecFactory.deflateCodec(0) });
     r.add(new Object[] { CodecFactory.deflateCodec(1) });
     r.add(new Object[] { CodecFactory.deflateCodec(9) });
     r.add(new Object[] { CodecFactory.nullCodec() });
@@ -183,7 +184,7 @@
     } finally {
       reader.close();
     }
-  }
+  }  
 
   protected void readFile(File f, DatumReader<Object> datumReader)
     throws IOException {

Modified: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileMeta.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileMeta.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileMeta.java (original)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileMeta.java Fri Jan 29 19:29:17 2010
@@ -24,6 +24,8 @@
 import java.io.FileInputStream;
 import java.io.IOException;
 
+import junit.framework.Assert;
+
 import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.file.DataFileWriter;
@@ -57,5 +59,17 @@
     w.setMeta("foo", "bar");
   }
 
-
+  @Test
+  public void testBlockSizeSetInvalid() {
+    int exceptions = 0;
+    for (int i = -1; i < 33; i++) {
+      // 33 invalid, one valid
+      try {
+        new DataFileWriter<Object>(new GenericDatumWriter<Object>()).setSyncInterval(i);
+      } catch (IllegalArgumentException iae) {
+        exceptions++;
+      }
+    }
+    Assert.assertEquals(33, exceptions);
+  }
 }

Modified: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java (original)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java Fri Jan 29 19:29:17 2010
@@ -134,7 +134,11 @@
     }
     assertEquals(COUNT, i);
     assertEquals(schema, fileReader.getSchema());
-    assertEquals(expectedCodec, fileReader.getMetaString("avro.codec"));
+    String codecStr = fileReader.getMetaString("avro.codec");
+    if (null == codecStr) {
+      codecStr = "null";
+    }
+    assertEquals(expectedCodec, codecStr);
   }
   
   @Test