You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/08/24 23:22:54 UTC

svn commit: r988732 - in /avro/trunk: ./ lang/java/src/java/org/apache/avro/file/ lang/java/src/test/java/org/apache/avro/

Author: cutting
Date: Tue Aug 24 21:22:52 2010
New Revision: 988732

URL: http://svn.apache.org/viewvc?rev=988732&view=rev
Log:
AVRO-541. Fix sporadic corruption when appending a compressed file to an uncompressed file.  Contributed by scottcarey.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
    avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
    avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=988732&r1=988731&r2=988732&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Aug 24 21:22:52 2010
@@ -175,6 +175,9 @@ Avro 1.4.0 (unreleased)
 
     AVRO-590. IDL: Fix order specifications. (cutting)
 
+    AVRO-541. Java: Fix sporadic corruption when appending a
+    compressed file to an uncompressed file. (scottcarey via cutting)
+
 Avro 1.3.3 (7 June 2010)
 
   IMPROVEMENTS

Modified: avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java?rev=988732&r1=988731&r2=988732&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java Tue Aug 24 21:22:52 2010
@@ -31,6 +31,7 @@ import java.util.NoSuchElementException;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DatumReader;
@@ -160,9 +161,9 @@ public class DataFileStream<D> implement
           }
         }
         if (hasNextBlock()) {
-          block = nextBlock(block);
-          blockBuffer = ByteBuffer.wrap(block.data, 0, block.blockSize);
-          blockBuffer = codec.decompress(blockBuffer);
+          block = nextRawBlock(block);
+          block.decompressUsing(codec);
+          blockBuffer = block.getAsByteBuffer();
           datumIn = DecoderFactory.defaultFactory().createBinaryDecoder(
               blockBuffer.array(), blockBuffer.arrayOffset() +
               blockBuffer.position(), blockBuffer.remaining(), datumIn);
@@ -240,7 +241,7 @@ public class DataFileStream<D> implement
     }
   }
 
-  DataBlock nextBlock(DataBlock reuse) throws IOException {
+  DataBlock nextRawBlock(DataBlock reuse) throws IOException {
     if (!hasNextBlock()) {
       throw new NoSuchElementException();
     }
@@ -252,15 +253,11 @@ public class DataFileStream<D> implement
     }
     // throws if it can't read the size requested
     vin.readFixed(reuse.data, 0, reuse.blockSize);
-    skipSync();
-    availableBlock = false;
-    return reuse;
-  }
-
-  void skipSync() throws IOException {
     vin.readFixed(syncBuffer);
     if (!Arrays.equals(syncBuffer, sync))
       throw new IOException("Invalid sync!");
+    availableBlock = false;
+    return reuse;
   }
 
   /** Not supported. */
@@ -272,14 +269,58 @@ public class DataFileStream<D> implement
   }
 
   static class DataBlock {
-    byte[] data;
-    long numEntries;
-    int blockSize;
-    DataBlock(long numEntries, int blockSize) {
+    private byte[] data;
+    private long numEntries;
+    private int blockSize;
+    private int offset = 0;
+    private DataBlock(long numEntries, int blockSize) {
       this.data = new byte[blockSize];
       this.numEntries = numEntries;
       this.blockSize = blockSize;
     }
+    
+    DataBlock(ByteBuffer block, long numEntries) {
+      this.data = block.array();
+      this.blockSize = block.remaining();
+      this.offset = block.arrayOffset() + block.position();
+      this.numEntries = numEntries;
+    }
+    
+    byte[] getData() {
+      return data;
+    }
+    
+    long getNumEntries() {
+      return numEntries;
+    }
+    
+    int getBlockSize() {
+      return blockSize;
+    }
+    
+    ByteBuffer getAsByteBuffer() {
+      return ByteBuffer.wrap(data, offset, blockSize);
+    }
+    
+    void decompressUsing(Codec c) throws IOException {
+      ByteBuffer result = c.decompress(getAsByteBuffer());
+      data = result.array();
+      blockSize = result.remaining();
+    }
+    
+    void compressUsing(Codec c) throws IOException {
+      ByteBuffer result = c.compress(getAsByteBuffer());
+      data = result.array();
+      blockSize = result.remaining();
+    }
+    
+    void writeBlockTo(BinaryEncoder e, byte[] sync) throws IOException {
+      e.writeLong(this.numEntries);
+      e.writeLong(this.blockSize);
+      e.writeFixed(this.data, offset, this.blockSize);
+      e.writeFixed(sync);
+    }   
+    
   }
 
 }

Modified: avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java?rev=988732&r1=988731&r2=988732&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java Tue Aug 24 21:22:52 2010
@@ -58,7 +58,7 @@ public class DataFileWriter<D> implement
   private DatumWriter<D> dout;
 
   private BufferedFileOutputStream out;
-  private Encoder vout;
+  private BinaryEncoder vout;
 
   private final Map<String,byte[]> meta = new HashMap<String,byte[]>();
 
@@ -292,40 +292,27 @@ public class DataFileWriter<D> implement
     if (codec.equals(otherCodec) && !recompress) {
       // copy raw bytes
       while(otherFile.hasNextBlock()) {
-        nextBlockRaw = otherFile.nextBlock(nextBlockRaw);
-        writeRawBlock(nextBlockRaw);
+        nextBlockRaw = otherFile.nextRawBlock(nextBlockRaw);
+        nextBlockRaw.writeBlockTo(vout, sync);
       }
     } else {
       while(otherFile.hasNextBlock()) {
-        nextBlockRaw = otherFile.nextBlock(nextBlockRaw);
-        ByteBuffer uncompressedData = otherCodec.decompress(ByteBuffer.wrap(
-            nextBlockRaw.data, 0, nextBlockRaw.blockSize));
-        ByteBuffer compressed = codec.compress(uncompressedData);
-        nextBlockRaw.data = compressed.array();
-        nextBlockRaw.blockSize = compressed.remaining();
-        writeRawBlock(nextBlockRaw);
+        nextBlockRaw = otherFile.nextRawBlock(nextBlockRaw);
+        nextBlockRaw.decompressUsing(otherCodec);
+        nextBlockRaw.compressUsing(codec);
+        nextBlockRaw.writeBlockTo(vout, sync);
       }
     }
   }
   
-  private void writeRawBlock(DataBlock rawBlock) throws IOException {
-    vout.writeLong(rawBlock.numEntries);
-    vout.writeLong(rawBlock.blockSize);
-    vout.writeFixed(rawBlock.data, 0, rawBlock.blockSize);
-    vout.writeFixed(sync);
-  }
-  
   private void writeBlock() throws IOException {
     if (blockCount > 0) {
-      vout.writeLong(blockCount);
       ByteBuffer uncompressed = buffer.getByteArrayAsByteBuffer();
-      ByteBuffer block = codec.compress(uncompressed);
-      vout.writeLong(block.remaining());
-      vout.writeFixed(block.array(), block.position() + block.arrayOffset(), 
-          block.remaining());
+      DataBlock block = new DataBlock(uncompressed, blockCount);
+      block.compressUsing(codec);
+      block.writeBlockTo(vout, sync);
       buffer.reset();
       blockCount = 0;
-      vout.writeFixed(sync);
     }
   }
 
@@ -371,7 +358,7 @@ public class DataFileWriter<D> implement
     public long tell() { return position+count; }
   }
 
-  static class NonCopyingByteArrayOutputStream extends ByteArrayOutputStream {
+  private static class NonCopyingByteArrayOutputStream extends ByteArrayOutputStream {
     NonCopyingByteArrayOutputStream(int initialSize) {
       super(initialSize);
     }

Modified: avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java?rev=988732&r1=988731&r2=988732&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java Tue Aug 24 21:22:52 2010
@@ -17,17 +17,14 @@
  */
 package org.apache.avro.file;
 
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.Inflater;
-import java.util.zip.InflaterInputStream;
-
-import org.apache.avro.file.DataFileWriter.NonCopyingByteArrayOutputStream;
+import java.util.zip.InflaterOutputStream;
 
 /** 
  * Implements DEFLATE (RFC1951) compression and decompression. 
@@ -44,7 +41,7 @@ class DeflateCodec extends Codec {
   static class Option extends CodecFactory {
     private int compressionLevel;
 
-    public Option(int compressionLevel) {
+    Option(int compressionLevel) {
       this.compressionLevel = compressionLevel;
     }
 
@@ -54,7 +51,7 @@ class DeflateCodec extends Codec {
     }
   }
 
-  NonCopyingByteArrayOutputStream compressionBuffer;
+  private ByteArrayOutputStream outputBuffer;
   private Deflater deflater;
   private Inflater inflater;
   //currently only do 'nowrap' -- RFC 1951, not zlib
@@ -72,57 +69,60 @@ class DeflateCodec extends Codec {
 
   @Override
   ByteBuffer compress(ByteBuffer data) throws IOException {
-    if (compressionBuffer == null) {
-      compressionBuffer = new NonCopyingByteArrayOutputStream(
-          data.remaining());
-    }
-    if (deflater == null) {
-      deflater = new Deflater(compressionLevel, nowrap);
-    }
-    // Pass output through deflate
-    DeflaterOutputStream deflaterStream = 
-      new DeflaterOutputStream(compressionBuffer, deflater);
-    deflaterStream.write(data.array(),
-        data.position() + data.arrayOffset(),
-        data.limit() + data.arrayOffset());
-    deflaterStream.finish();
-    ByteBuffer result = compressionBuffer.getByteArrayAsByteBuffer();
-    deflater.reset();
-    compressionBuffer.reset();
+    ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
+    DeflaterOutputStream ios = new DeflaterOutputStream(baos, getDeflater());
+    writeAndClose(data, ios);
+    ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
     return result;
   }
 
   @Override
   ByteBuffer decompress(ByteBuffer data) throws IOException {
-    if (compressionBuffer == null) {
-      compressionBuffer = new NonCopyingByteArrayOutputStream(
-          data.remaining());
+    ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
+    InflaterOutputStream ios = new InflaterOutputStream(baos, getInflater());
+    writeAndClose(data, ios);
+    ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
+    return result;
+  }
+  
+  private void writeAndClose(ByteBuffer data, OutputStream to) throws IOException {
+    byte[] input = data.array();
+    int offset = data.arrayOffset() + data.position();
+    int length = data.remaining();
+    try {
+      to.write(input, offset, length);
+    } finally {
+      to.close();
     }
-    if (inflater == null) {
+  }
+  
+  // get and initialize the inflater for use.
+  private Inflater getInflater() {
+    if (null == inflater) {
       inflater = new Inflater(nowrap);
     }
-    InputStream uncompressed = new InflaterInputStream(
-        new ByteArrayInputStream(data.array(),
-            data.position() + data.arrayOffset(),
-            data.remaining()), inflater);
-    int read;
-    byte[] buff = new byte[2048];
-    try {
-      while (true) {
-        read = uncompressed.read(buff);
-        if (read < 0) break;
-        compressionBuffer.write(buff, 0, read);
-      } 
-    } catch (EOFException e) {
-      // sometimes InflaterInputStream.read
-      // throws this instead of returning -1
-    }
-    ByteBuffer result = compressionBuffer.getByteArrayAsByteBuffer();
     inflater.reset();
-    compressionBuffer.reset();
-    return result;
+    return inflater;
   }
 
+  // get and initialize the deflater for use.
+  private Deflater getDeflater() {
+    if (null == deflater) {
+      deflater = new Deflater(compressionLevel, nowrap);
+    }
+    deflater.reset();
+    return deflater;
+  }
+  
+  // get and initialize the output buffer for use.
+  private ByteArrayOutputStream getOutputBuffer(int suggestedLength) {
+    if (null == outputBuffer) {
+      outputBuffer = new ByteArrayOutputStream(suggestedLength);
+    }
+    outputBuffer.reset();
+    return outputBuffer;
+  }
+  
   @Override
   public int hashCode() {
     return nowrap ? 0 : 1;

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java?rev=988732&r1=988731&r2=988732&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java Tue Aug 24 21:22:52 2010
@@ -17,7 +17,7 @@
  */
 package org.apache.avro;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -78,8 +78,10 @@ public class TestDataFileConcat {
 
   private static final String SCHEMA_JSON =
     "{\"type\": \"record\", \"name\": \"Test\", \"fields\": ["
-    +"{\"name\":\"stringField\", \"type\":\"string\"},"
-    +"{\"name\":\"longField\", \"type\":\"long\"}]}";
+    +"{\"name\":\"stringField\", \"type\":\"string\"}" +
+    ","
+    +"{\"name\":\"longField\", \"type\":\"long\"}" +
+    "]}";
   private static final Schema SCHEMA = Schema.parse(SCHEMA_JSON);
 
   private File makeFile(String name) {
@@ -88,67 +90,87 @@ public class TestDataFileConcat {
 
   @Test
   public void testConcateateFiles() throws IOException {
-    File file1 = makeFile((codec == null ? "null" : codec.toString()) + "-A");
-    File file2 = makeFile((codec2 == null ? "null" : codec2.toString()) + "-B");
-    DataFileWriter<Object> writer =
-      new DataFileWriter<Object>(new GenericDatumWriter<Object>())
-      .setSyncInterval(500);
-    if (codec != null) {
-      writer.setCodec(codec);
-    }
-    writer.create(SCHEMA, file1);
-    try {
-      for (Object datum : new RandomData(SCHEMA, COUNT, SEED)) {
-        writer.append(datum);
+    System.out.println("SEED = "+SEED);
+    System.out.println("COUNT = "+COUNT);
+    for (int k = 0; k < 60; k++) {
+      int syncInterval = 460 +k;
+      RandomData data1 = new RandomData(SCHEMA, COUNT, SEED);
+      RandomData data2 = new RandomData(SCHEMA, COUNT, SEED+1);
+      File file1 = makeFile((codec == null ? "null" : codec.toString()) + "-A");
+      File file2 = makeFile((codec2 == null ? "null" : codec2.toString()) + "-B");
+      DataFileWriter<Object> writer =
+        new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+        .setSyncInterval(syncInterval);
+      if (codec != null) {
+        writer.setCodec(codec);
       }
-    } finally {
-      writer.close();
-    }
-    DataFileWriter<Object> writer2 =
-      new DataFileWriter<Object>(new GenericDatumWriter<Object>())
-      .setSyncInterval(500);
-    if (codec2 != null) {
-      writer2.setCodec(codec2);
-    }
-    writer2.create(SCHEMA, file2);
-    try {
-      for (Object datum : new RandomData(SCHEMA, COUNT, SEED+1)) {
-        writer2.append(datum);
+      writer.create(SCHEMA, file1);
+      try {
+        for (Object datum : data1) {
+          writer.append(datum);
+        }
+      } finally {
+        writer.close();
       }
-    } finally {
-      writer2.close();
-    }
-    DataFileWriter<Object> concatinto = 
-      new DataFileWriter<Object>(new GenericDatumWriter<Object>())
-      .setSyncInterval(500);
-    concatinto.appendTo(file1);
-    DataFileReader<Object> concatfrom =
-      new DataFileReader<Object>(file2, new GenericDatumReader<Object>());
-    concatinto.appendAllFrom(concatfrom, recompress);
-    concatinto.close();
-    
-    DataFileReader<Object> concat =
-      new DataFileReader<Object>(file1, new GenericDatumReader<Object>());
-   
-    try {
-      Object datum = null;
-      if (VALIDATE) {
-        for (Object expected : new RandomData(SCHEMA, COUNT, SEED)) {
-          datum = concat.next(datum);
-          assertEquals(expected, datum);
+      DataFileWriter<Object> writer2 =
+        new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+        .setSyncInterval(syncInterval);
+      if (codec2 != null) {
+        writer2.setCodec(codec2);
+      }
+      writer2.create(SCHEMA, file2);
+      try {
+        for (Object datum : data2) {
+          writer2.append(datum);
         }
-        for (Object expected : new RandomData(SCHEMA, COUNT, SEED+1)) {
-          datum = concat.next(datum);
-          assertEquals(expected, datum);
+      } finally {
+        writer2.close();
+      }
+      DataFileWriter<Object> concatinto = 
+        new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+        .setSyncInterval(syncInterval);
+      concatinto.appendTo(file1);
+      DataFileReader<Object> concatfrom =
+        new DataFileReader<Object>(file2, new GenericDatumReader<Object>());
+      concatinto.appendAllFrom(concatfrom, recompress);
+      concatinto.close();
+      concatfrom.close();
+
+      concatfrom = new DataFileReader<Object>(file2, new GenericDatumReader<Object>());
+
+
+      DataFileReader<Object> concat =
+        new DataFileReader<Object>(file1, new GenericDatumReader<Object>());
+      int count = 0;
+      try {
+        Object datum = null;
+        if (VALIDATE) {
+          for (Object expected : data1) {
+            datum = concat.next(datum);
+            assertEquals("at "+count++, expected, datum);
+          }
+          for (Object expected : data2) {
+            datum = concatfrom.next(datum);
+            assertEquals("at "+count++, expected, datum);
+          }
+          for (Object expected : data2) {
+            datum = concat.next(datum);
+            assertEquals("at "+count++, expected, datum);
+          }
+        } else {
+          for (int i = 0; i < COUNT*2; i++) {
+            datum = concat.next(datum);
+          }
         }
-      } else {
-        for (int i = 0; i < COUNT*2; i++) {
-          datum = concat.next(datum);
+      } finally {
+        if (count != 3 * COUNT) {
+          System.out.println(count + " " + k);
         }
+        concat.close();
+        concatfrom.close();
       }
-    } finally {
-      concat.close();
-    }
 
+    }
   }
+  
 }