You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/08/24 23:22:54 UTC
svn commit: r988732 - in /avro/trunk: ./
lang/java/src/java/org/apache/avro/file/
lang/java/src/test/java/org/apache/avro/
Author: cutting
Date: Tue Aug 24 21:22:52 2010
New Revision: 988732
URL: http://svn.apache.org/viewvc?rev=988732&view=rev
Log:
AVRO-541. Fix sporadic corruption when appending a compressed file to an uncompressed file. Contributed by scottcarey.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=988732&r1=988731&r2=988732&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Aug 24 21:22:52 2010
@@ -175,6 +175,9 @@ Avro 1.4.0 (unreleased)
AVRO-590. IDL: Fix order specifications. (cutting)
+ AVRO-541. Java: Fix sporadic corruption when appending a
+ compressed file to an uncompressed file. (scottcarey via cutting)
+
Avro 1.3.3 (7 June 2010)
IMPROVEMENTS
Modified: avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java?rev=988732&r1=988731&r2=988732&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java Tue Aug 24 21:22:52 2010
@@ -31,6 +31,7 @@ import java.util.NoSuchElementException;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
@@ -160,9 +161,9 @@ public class DataFileStream<D> implement
}
}
if (hasNextBlock()) {
- block = nextBlock(block);
- blockBuffer = ByteBuffer.wrap(block.data, 0, block.blockSize);
- blockBuffer = codec.decompress(blockBuffer);
+ block = nextRawBlock(block);
+ block.decompressUsing(codec);
+ blockBuffer = block.getAsByteBuffer();
datumIn = DecoderFactory.defaultFactory().createBinaryDecoder(
blockBuffer.array(), blockBuffer.arrayOffset() +
blockBuffer.position(), blockBuffer.remaining(), datumIn);
@@ -240,7 +241,7 @@ public class DataFileStream<D> implement
}
}
- DataBlock nextBlock(DataBlock reuse) throws IOException {
+ DataBlock nextRawBlock(DataBlock reuse) throws IOException {
if (!hasNextBlock()) {
throw new NoSuchElementException();
}
@@ -252,15 +253,11 @@ public class DataFileStream<D> implement
}
// throws if it can't read the size requested
vin.readFixed(reuse.data, 0, reuse.blockSize);
- skipSync();
- availableBlock = false;
- return reuse;
- }
-
- void skipSync() throws IOException {
vin.readFixed(syncBuffer);
if (!Arrays.equals(syncBuffer, sync))
throw new IOException("Invalid sync!");
+ availableBlock = false;
+ return reuse;
}
/** Not supported. */
@@ -272,14 +269,58 @@ public class DataFileStream<D> implement
}
static class DataBlock {
- byte[] data;
- long numEntries;
- int blockSize;
- DataBlock(long numEntries, int blockSize) {
+ private byte[] data;
+ private long numEntries;
+ private int blockSize;
+ private int offset = 0;
+ private DataBlock(long numEntries, int blockSize) {
this.data = new byte[blockSize];
this.numEntries = numEntries;
this.blockSize = blockSize;
}
+
+ DataBlock(ByteBuffer block, long numEntries) {
+ this.data = block.array();
+ this.blockSize = block.remaining();
+ this.offset = block.arrayOffset() + block.position();
+ this.numEntries = numEntries;
+ }
+
+ byte[] getData() {
+ return data;
+ }
+
+ long getNumEntries() {
+ return numEntries;
+ }
+
+ int getBlockSize() {
+ return blockSize;
+ }
+
+ ByteBuffer getAsByteBuffer() {
+ return ByteBuffer.wrap(data, offset, blockSize);
+ }
+
+ void decompressUsing(Codec c) throws IOException {
+ ByteBuffer result = c.decompress(getAsByteBuffer());
+ data = result.array();
+ blockSize = result.remaining();
+ }
+
+ void compressUsing(Codec c) throws IOException {
+ ByteBuffer result = c.compress(getAsByteBuffer());
+ data = result.array();
+ blockSize = result.remaining();
+ }
+
+ void writeBlockTo(BinaryEncoder e, byte[] sync) throws IOException {
+ e.writeLong(this.numEntries);
+ e.writeLong(this.blockSize);
+ e.writeFixed(this.data, offset, this.blockSize);
+ e.writeFixed(sync);
+ }
+
}
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java?rev=988732&r1=988731&r2=988732&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java Tue Aug 24 21:22:52 2010
@@ -58,7 +58,7 @@ public class DataFileWriter<D> implement
private DatumWriter<D> dout;
private BufferedFileOutputStream out;
- private Encoder vout;
+ private BinaryEncoder vout;
private final Map<String,byte[]> meta = new HashMap<String,byte[]>();
@@ -292,40 +292,27 @@ public class DataFileWriter<D> implement
if (codec.equals(otherCodec) && !recompress) {
// copy raw bytes
while(otherFile.hasNextBlock()) {
- nextBlockRaw = otherFile.nextBlock(nextBlockRaw);
- writeRawBlock(nextBlockRaw);
+ nextBlockRaw = otherFile.nextRawBlock(nextBlockRaw);
+ nextBlockRaw.writeBlockTo(vout, sync);
}
} else {
while(otherFile.hasNextBlock()) {
- nextBlockRaw = otherFile.nextBlock(nextBlockRaw);
- ByteBuffer uncompressedData = otherCodec.decompress(ByteBuffer.wrap(
- nextBlockRaw.data, 0, nextBlockRaw.blockSize));
- ByteBuffer compressed = codec.compress(uncompressedData);
- nextBlockRaw.data = compressed.array();
- nextBlockRaw.blockSize = compressed.remaining();
- writeRawBlock(nextBlockRaw);
+ nextBlockRaw = otherFile.nextRawBlock(nextBlockRaw);
+ nextBlockRaw.decompressUsing(otherCodec);
+ nextBlockRaw.compressUsing(codec);
+ nextBlockRaw.writeBlockTo(vout, sync);
}
}
}
- private void writeRawBlock(DataBlock rawBlock) throws IOException {
- vout.writeLong(rawBlock.numEntries);
- vout.writeLong(rawBlock.blockSize);
- vout.writeFixed(rawBlock.data, 0, rawBlock.blockSize);
- vout.writeFixed(sync);
- }
-
private void writeBlock() throws IOException {
if (blockCount > 0) {
- vout.writeLong(blockCount);
ByteBuffer uncompressed = buffer.getByteArrayAsByteBuffer();
- ByteBuffer block = codec.compress(uncompressed);
- vout.writeLong(block.remaining());
- vout.writeFixed(block.array(), block.position() + block.arrayOffset(),
- block.remaining());
+ DataBlock block = new DataBlock(uncompressed, blockCount);
+ block.compressUsing(codec);
+ block.writeBlockTo(vout, sync);
buffer.reset();
blockCount = 0;
- vout.writeFixed(sync);
}
}
@@ -371,7 +358,7 @@ public class DataFileWriter<D> implement
public long tell() { return position+count; }
}
- static class NonCopyingByteArrayOutputStream extends ByteArrayOutputStream {
+ private static class NonCopyingByteArrayOutputStream extends ByteArrayOutputStream {
NonCopyingByteArrayOutputStream(int initialSize) {
super(initialSize);
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java?rev=988732&r1=988731&r2=988732&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java Tue Aug 24 21:22:52 2010
@@ -17,17 +17,14 @@
*/
package org.apache.avro.file;
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
-import java.util.zip.InflaterInputStream;
-
-import org.apache.avro.file.DataFileWriter.NonCopyingByteArrayOutputStream;
+import java.util.zip.InflaterOutputStream;
/**
* Implements DEFLATE (RFC1951) compression and decompression.
@@ -44,7 +41,7 @@ class DeflateCodec extends Codec {
static class Option extends CodecFactory {
private int compressionLevel;
- public Option(int compressionLevel) {
+ Option(int compressionLevel) {
this.compressionLevel = compressionLevel;
}
@@ -54,7 +51,7 @@ class DeflateCodec extends Codec {
}
}
- NonCopyingByteArrayOutputStream compressionBuffer;
+ private ByteArrayOutputStream outputBuffer;
private Deflater deflater;
private Inflater inflater;
//currently only do 'nowrap' -- RFC 1951, not zlib
@@ -72,57 +69,60 @@ class DeflateCodec extends Codec {
@Override
ByteBuffer compress(ByteBuffer data) throws IOException {
- if (compressionBuffer == null) {
- compressionBuffer = new NonCopyingByteArrayOutputStream(
- data.remaining());
- }
- if (deflater == null) {
- deflater = new Deflater(compressionLevel, nowrap);
- }
- // Pass output through deflate
- DeflaterOutputStream deflaterStream =
- new DeflaterOutputStream(compressionBuffer, deflater);
- deflaterStream.write(data.array(),
- data.position() + data.arrayOffset(),
- data.limit() + data.arrayOffset());
- deflaterStream.finish();
- ByteBuffer result = compressionBuffer.getByteArrayAsByteBuffer();
- deflater.reset();
- compressionBuffer.reset();
+ ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
+ DeflaterOutputStream ios = new DeflaterOutputStream(baos, getDeflater());
+ writeAndClose(data, ios);
+ ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
return result;
}
@Override
ByteBuffer decompress(ByteBuffer data) throws IOException {
- if (compressionBuffer == null) {
- compressionBuffer = new NonCopyingByteArrayOutputStream(
- data.remaining());
+ ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
+ InflaterOutputStream ios = new InflaterOutputStream(baos, getInflater());
+ writeAndClose(data, ios);
+ ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
+ return result;
+ }
+
+ private void writeAndClose(ByteBuffer data, OutputStream to) throws IOException {
+ byte[] input = data.array();
+ int offset = data.arrayOffset() + data.position();
+ int length = data.remaining();
+ try {
+ to.write(input, offset, length);
+ } finally {
+ to.close();
}
- if (inflater == null) {
+ }
+
+ // get and initialize the inflater for use.
+ private Inflater getInflater() {
+ if (null == inflater) {
inflater = new Inflater(nowrap);
}
- InputStream uncompressed = new InflaterInputStream(
- new ByteArrayInputStream(data.array(),
- data.position() + data.arrayOffset(),
- data.remaining()), inflater);
- int read;
- byte[] buff = new byte[2048];
- try {
- while (true) {
- read = uncompressed.read(buff);
- if (read < 0) break;
- compressionBuffer.write(buff, 0, read);
- }
- } catch (EOFException e) {
- // sometimes InflaterInputStream.read
- // throws this instead of returning -1
- }
- ByteBuffer result = compressionBuffer.getByteArrayAsByteBuffer();
inflater.reset();
- compressionBuffer.reset();
- return result;
+ return inflater;
}
+ // get and initialize the deflater for use.
+ private Deflater getDeflater() {
+ if (null == deflater) {
+ deflater = new Deflater(compressionLevel, nowrap);
+ }
+ deflater.reset();
+ return deflater;
+ }
+
+ // get and initialize the output buffer for use.
+ private ByteArrayOutputStream getOutputBuffer(int suggestedLength) {
+ if (null == outputBuffer) {
+ outputBuffer = new ByteArrayOutputStream(suggestedLength);
+ }
+ outputBuffer.reset();
+ return outputBuffer;
+ }
+
@Override
public int hashCode() {
return nowrap ? 0 : 1;
Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java?rev=988732&r1=988731&r2=988732&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java Tue Aug 24 21:22:52 2010
@@ -17,7 +17,7 @@
*/
package org.apache.avro;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
@@ -78,8 +78,10 @@ public class TestDataFileConcat {
private static final String SCHEMA_JSON =
"{\"type\": \"record\", \"name\": \"Test\", \"fields\": ["
- +"{\"name\":\"stringField\", \"type\":\"string\"},"
- +"{\"name\":\"longField\", \"type\":\"long\"}]}";
+ +"{\"name\":\"stringField\", \"type\":\"string\"}" +
+ ","
+ +"{\"name\":\"longField\", \"type\":\"long\"}" +
+ "]}";
private static final Schema SCHEMA = Schema.parse(SCHEMA_JSON);
private File makeFile(String name) {
@@ -88,67 +90,87 @@ public class TestDataFileConcat {
@Test
public void testConcateateFiles() throws IOException {
- File file1 = makeFile((codec == null ? "null" : codec.toString()) + "-A");
- File file2 = makeFile((codec2 == null ? "null" : codec2.toString()) + "-B");
- DataFileWriter<Object> writer =
- new DataFileWriter<Object>(new GenericDatumWriter<Object>())
- .setSyncInterval(500);
- if (codec != null) {
- writer.setCodec(codec);
- }
- writer.create(SCHEMA, file1);
- try {
- for (Object datum : new RandomData(SCHEMA, COUNT, SEED)) {
- writer.append(datum);
+ System.out.println("SEED = "+SEED);
+ System.out.println("COUNT = "+COUNT);
+ for (int k = 0; k < 60; k++) {
+ int syncInterval = 460 +k;
+ RandomData data1 = new RandomData(SCHEMA, COUNT, SEED);
+ RandomData data2 = new RandomData(SCHEMA, COUNT, SEED+1);
+ File file1 = makeFile((codec == null ? "null" : codec.toString()) + "-A");
+ File file2 = makeFile((codec2 == null ? "null" : codec2.toString()) + "-B");
+ DataFileWriter<Object> writer =
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+ .setSyncInterval(syncInterval);
+ if (codec != null) {
+ writer.setCodec(codec);
}
- } finally {
- writer.close();
- }
- DataFileWriter<Object> writer2 =
- new DataFileWriter<Object>(new GenericDatumWriter<Object>())
- .setSyncInterval(500);
- if (codec2 != null) {
- writer2.setCodec(codec2);
- }
- writer2.create(SCHEMA, file2);
- try {
- for (Object datum : new RandomData(SCHEMA, COUNT, SEED+1)) {
- writer2.append(datum);
+ writer.create(SCHEMA, file1);
+ try {
+ for (Object datum : data1) {
+ writer.append(datum);
+ }
+ } finally {
+ writer.close();
}
- } finally {
- writer2.close();
- }
- DataFileWriter<Object> concatinto =
- new DataFileWriter<Object>(new GenericDatumWriter<Object>())
- .setSyncInterval(500);
- concatinto.appendTo(file1);
- DataFileReader<Object> concatfrom =
- new DataFileReader<Object>(file2, new GenericDatumReader<Object>());
- concatinto.appendAllFrom(concatfrom, recompress);
- concatinto.close();
-
- DataFileReader<Object> concat =
- new DataFileReader<Object>(file1, new GenericDatumReader<Object>());
-
- try {
- Object datum = null;
- if (VALIDATE) {
- for (Object expected : new RandomData(SCHEMA, COUNT, SEED)) {
- datum = concat.next(datum);
- assertEquals(expected, datum);
+ DataFileWriter<Object> writer2 =
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+ .setSyncInterval(syncInterval);
+ if (codec2 != null) {
+ writer2.setCodec(codec2);
+ }
+ writer2.create(SCHEMA, file2);
+ try {
+ for (Object datum : data2) {
+ writer2.append(datum);
}
- for (Object expected : new RandomData(SCHEMA, COUNT, SEED+1)) {
- datum = concat.next(datum);
- assertEquals(expected, datum);
+ } finally {
+ writer2.close();
+ }
+ DataFileWriter<Object> concatinto =
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+ .setSyncInterval(syncInterval);
+ concatinto.appendTo(file1);
+ DataFileReader<Object> concatfrom =
+ new DataFileReader<Object>(file2, new GenericDatumReader<Object>());
+ concatinto.appendAllFrom(concatfrom, recompress);
+ concatinto.close();
+ concatfrom.close();
+
+ concatfrom = new DataFileReader<Object>(file2, new GenericDatumReader<Object>());
+
+
+ DataFileReader<Object> concat =
+ new DataFileReader<Object>(file1, new GenericDatumReader<Object>());
+ int count = 0;
+ try {
+ Object datum = null;
+ if (VALIDATE) {
+ for (Object expected : data1) {
+ datum = concat.next(datum);
+ assertEquals("at "+count++, expected, datum);
+ }
+ for (Object expected : data2) {
+ datum = concatfrom.next(datum);
+ assertEquals("at "+count++, expected, datum);
+ }
+ for (Object expected : data2) {
+ datum = concat.next(datum);
+ assertEquals("at "+count++, expected, datum);
+ }
+ } else {
+ for (int i = 0; i < COUNT*2; i++) {
+ datum = concat.next(datum);
+ }
}
- } else {
- for (int i = 0; i < COUNT*2; i++) {
- datum = concat.next(datum);
+ } finally {
+ if (count != 3 * COUNT) {
+ System.out.println(count + " " + k);
}
+ concat.close();
+ concatfrom.close();
}
- } finally {
- concat.close();
- }
+ }
}
+
}