You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ph...@apache.org on 2010/01/29 20:29:19 UTC
svn commit: r904604 - in /hadoop/avro/trunk: ./ doc/src/content/xdocs/
lang/java/src/java/org/apache/avro/file/
lang/java/src/test/java/org/apache/avro/
lang/java/src/test/java/org/apache/avro/tool/
Author: philz
Date: Fri Jan 29 19:29:17 2010
New Revision: 904604
URL: http://svn.apache.org/viewvc?rev=904604&view=rev
Log:
AVRO-380. Avro Container File format change: add block size to block descriptor. Contributed by Scott Carey.
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/doc/src/content/xdocs/spec.xml
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileMeta.java
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Fri Jan 29 19:29:17 2010
@@ -79,6 +79,9 @@
AVRO-368. Reserve avro.* in object container files, and
rename existing reserved words. (philz)
+ AVRO-380. Avro Container File format change: add block size to block
+ descriptor. (Scott Carey via philz)
+
IMPROVEMENTS
AVRO-157. Changes from code review comments for C++. (sbanacho)
Modified: hadoop/avro/trunk/doc/src/content/xdocs/spec.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/doc/src/content/xdocs/spec.xml?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/doc/src/content/xdocs/spec.xml (original)
+++ hadoop/avro/trunk/doc/src/content/xdocs/spec.xml Fri Jan 29 19:29:17 2010
@@ -645,11 +645,15 @@
<p>A file data block consists of:</p>
<ul>
<li>A long indicating the count of objects in this block.</li>
+ <li>A long indicating the size in bytes of the serialized objects
+ in the current block, after any codec is applied</li>
<li>The serialized objects. If a codec is specified, this is
compressed by that codec.</li>
<li>The file's 16-byte sync marker.</li>
</ul>
-
+ <p>Thus, each block's binary data can be efficiently extracted or skipped without
+ deserializing the contents. The combination of block size, object counts, and
+ sync markers enable detection of corrupt blocks and help ensure data integrity.</p>
<section>
<title>Required Codecs</title>
<section>
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java Fri Jan 29 19:29:17 2010
@@ -19,8 +19,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import org.apache.avro.io.Decoder;
@@ -32,8 +30,8 @@
abstract class Codec {
/** Name of the codec; written to the file's metadata. */
abstract String getName();
- /** Compresses the input data into out. */
- abstract void compress(ByteArrayOutputStream data, OutputStream out) throws IOException;
- /** Returns a decoder on the uncompressed stream. */
- abstract Decoder decompress(InputStream in, Decoder vin) throws IOException;
+ /** Compresses the input data and return the result as a ByteArrayOutputStream */
+ abstract ByteArrayOutputStream compress(ByteArrayOutputStream data) throws IOException;
+ /** Returns a decoder on the uncompressed data. */
+ abstract Decoder decompress(byte[] compressedData) throws IOException;
}
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java Fri Jan 29 19:29:17 2010
@@ -48,7 +48,7 @@
/** Decoder on raw input stream. (Used for metadata.) */
final Decoder vin;
/** Secondary decoder, for datums.
- * (Different than vin for compressed segments.) */
+ * (Different than vin for block segments.) */
Decoder datumIn = null;
Map<String,byte[]> meta = new HashMap<String,byte[]>();
@@ -138,8 +138,29 @@
public boolean hasNext() {
try {
if (blockRemaining == 0) {
+ // check that the previous block was finished
+ // clunky and inefficient with the current Decoder API
+ // the only way to detect end of stream is with EOFException
+ if (null != datumIn) {
+ try {
+ datumIn.readBoolean();
+ throw new IOException("Block read partially, the data may be corrupt");
+ } catch (EOFException eof) {
+ // this indicates the block is at its end as we expect
+ }
+ }
blockRemaining = vin.readLong(); // read block count
- datumIn = codec.decompress(in, vin);
+ long compressedSize = vin.readLong(); // read block size
+ if (compressedSize > Integer.MAX_VALUE) {
+ throw new IOException("Block size too large: " + compressedSize);
+ }
+ byte[] block = new byte[(int)compressedSize];
+ // if vin buffers in, the below will needs to handle it
+ int sizeRead = in.read(block);
+ if (sizeRead != compressedSize) {
+ throw new IOException("Incomplete Block");
+ }
+ datumIn = codec.decompress(block);
}
return blockRemaining != 0;
} catch (EOFException e) { // at EOF
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java Fri Jan 29 19:29:17 2010
@@ -61,7 +61,7 @@
private final Map<String,byte[]> meta = new HashMap<String,byte[]>();
- private int blockCount; // # entries in current block
+ private long blockCount; // # entries in current block
private ByteArrayOutputStream buffer;
private Encoder bufOut;
@@ -95,8 +95,22 @@
return this;
}
- /** Set the synchronization interval for this file, in bytes. */
+ /**
+ * Set the synchronization interval for this file, in bytes.
+ * Valid values range from 32 to 2^30
+ * Suggested values are between 2K and 2M
+ *
+ * Invalid values throw IllegalArgumentException
+ *
+ * @param syncInterval
+ * the approximate number of uncompressed bytes to write in each block
+ * @return
+ * this DataFileWriter
+ */
public DataFileWriter<D> setSyncInterval(int syncInterval) {
+ if (syncInterval < 32 || syncInterval > (1 << 30)) {
+ throw new IllegalArgumentException("Invalid syncInterval value: " + syncInterval);
+ }
this.syncInterval = syncInterval;
return this;
}
@@ -127,6 +141,7 @@
vout.writeBytes(entry.getValue());
}
vout.writeMapEnd();
+ vout.flush(); //vout may be buffered, flush before writing to out
out.write(sync); // write initial sync
@@ -166,7 +181,8 @@
this.out = new BufferedFileOutputStream(outs);
this.vout = new BinaryEncoder(out);
dout.setSchema(schema);
- this.buffer = new ByteArrayOutputStream(syncInterval*2);
+ this.buffer = new ByteArrayOutputStream(
+ Math.min((int)(syncInterval * 1.25), Integer.MAX_VALUE/2 -1));
if (this.codec == null) {
this.codec = CodecFactory.nullCodec().createInstance();
}
@@ -236,7 +252,10 @@
private void writeBlock() throws IOException {
if (blockCount > 0) {
vout.writeLong(blockCount);
- codec.compress(buffer, out);
+ ByteArrayOutputStream block = codec.compress(buffer);
+ vout.writeLong(block.size());
+ vout.flush(); //vout may be buffered, flush before writing to out
+ block.writeTo(out);
buffer.reset();
blockCount = 0;
out.write(sync);
@@ -285,4 +304,3 @@
}
}
-
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java Fri Jan 29 19:29:17 2010
@@ -17,17 +17,16 @@
*/
package org.apache.avro.file;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
/**
@@ -58,7 +57,6 @@
ByteArrayOutputStream compressionBuffer;
private Deflater deflater;
private int compressionLevel;
- private Inflater inflater;
public DeflateCodec(int compressionLevel) {
this.compressionLevel = compressionLevel;
@@ -70,34 +68,30 @@
}
@Override
- void compress(ByteArrayOutputStream buffer, OutputStream out) throws IOException {
+ ByteArrayOutputStream compress(ByteArrayOutputStream buffer)
+ throws IOException {
if (compressionBuffer == null) {
compressionBuffer = new ByteArrayOutputStream(buffer.size());
+ } else {
+ compressionBuffer.reset();
}
if (deflater == null) {
- deflater = new Deflater(compressionLevel, false);
+ deflater = new Deflater(compressionLevel, true);
}
- // Pass output through deflate, and prepend with length of compressed output.
+ // Pass output through deflate
DeflaterOutputStream deflaterStream =
new DeflaterOutputStream(compressionBuffer, deflater);
buffer.writeTo(deflaterStream);
deflaterStream.finish();
- new BinaryEncoder(out).writeLong(compressionBuffer.size());
- compressionBuffer.writeTo(out);
- compressionBuffer.reset();
deflater.reset();
+ return compressionBuffer;
}
@Override
- Decoder decompress(InputStream in, Decoder vin) throws IOException {
- if (inflater == null) {
- inflater = new Inflater(false);
- }
- long compressedLength = vin.readLong();
+ Decoder decompress(byte[] in) throws IOException {
+ Inflater inflater = new Inflater(true);
InputStream uncompressed = new InflaterInputStream(
- new LengthLimitedInputStream(in, compressedLength),
- inflater);
- inflater.reset();
+ new ByteArrayInputStream(in), inflater);
return new BinaryDecoder(uncompressed);
}
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java Fri Jan 29 19:29:17 2010
@@ -17,11 +17,11 @@
*/
package org.apache.avro.file;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.Decoder;
/** Implements "null" (pass through) codec. */
@@ -46,12 +46,12 @@
}
@Override
- void compress(ByteArrayOutputStream buffer, OutputStream out) throws IOException {
- buffer.writeTo(out);
+ ByteArrayOutputStream compress(ByteArrayOutputStream buffer) throws IOException {
+ return buffer;
}
@Override
- Decoder decompress(InputStream in, Decoder vin) {
- return vin;
+ Decoder decompress(byte[] in) throws IOException {
+ return new BinaryDecoder(new ByteArrayInputStream(in));
}
}
Modified: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java Fri Jan 29 19:29:17 2010
@@ -55,6 +55,7 @@
public static List<Object[]> codecs() {
List<Object[]> r = new ArrayList<Object[]>();
r.add(new Object[] { null });
+ r.add(new Object[] { CodecFactory.deflateCodec(0) });
r.add(new Object[] { CodecFactory.deflateCodec(1) });
r.add(new Object[] { CodecFactory.deflateCodec(9) });
r.add(new Object[] { CodecFactory.nullCodec() });
@@ -183,7 +184,7 @@
} finally {
reader.close();
}
- }
+ }
protected void readFile(File f, DatumReader<Object> datumReader)
throws IOException {
Modified: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileMeta.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileMeta.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileMeta.java (original)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileMeta.java Fri Jan 29 19:29:17 2010
@@ -24,6 +24,8 @@
import java.io.FileInputStream;
import java.io.IOException;
+import junit.framework.Assert;
+
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
@@ -57,5 +59,17 @@
w.setMeta("foo", "bar");
}
-
+ @Test
+ public void testBlockSizeSetInvalid() {
+ int exceptions = 0;
+ for (int i = -1; i < 33; i++) {
+ // 33 invalid, one valid
+ try {
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>()).setSyncInterval(i);
+ } catch (IllegalArgumentException iae) {
+ exceptions++;
+ }
+ }
+ Assert.assertEquals(33, exceptions);
+ }
}
Modified: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java?rev=904604&r1=904603&r2=904604&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java (original)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/tool/TestDataFileTools.java Fri Jan 29 19:29:17 2010
@@ -134,7 +134,11 @@
}
assertEquals(COUNT, i);
assertEquals(schema, fileReader.getSchema());
- assertEquals(expectedCodec, fileReader.getMetaString("avro.codec"));
+ String codecStr = fileReader.getMetaString("avro.codec");
+ if (null == codecStr) {
+ codecStr = "null";
+ }
+ assertEquals(expectedCodec, codecStr);
}
@Test