You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/10/26 00:13:43 UTC

svn commit: r1535888 - in /avro/trunk: CHANGES.txt lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java

Author: cutting
Date: Fri Oct 25 22:13:43 2013
New Revision: 1535888

URL: http://svn.apache.org/r1535888
Log:
AVRO-1387. Java: Add DataFileWriter option to inhibit flush per block.  Contributed by Hari Shreedharan.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1535888&r1=1535887&r2=1535888&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Oct 25 22:13:43 2013
@@ -14,6 +14,9 @@ Trunk (not yet released)
     AVRO-1332. C#: Improve DatumReader performance.
     (David McIntosh via cutting)
 
+    AVRO-1387. Java: Add DataFileWriter option to inhibit flush per block.
+    (Hari Shreedharan via cutting)
+
   BUG FIXES
 
     AVRO-1368. Fix SpecificDatumWriter to, when writing a string

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java?rev=1535888&r1=1535887&r2=1535888&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java Fri Oct 25 22:13:43 2013
@@ -310,6 +310,7 @@ public class DataFileStream<D> implement
     private long numEntries;
     private int blockSize;
     private int offset = 0;
+    private boolean flushOnWrite = true;
     private DataBlock(long numEntries, int blockSize) {
       this.data = new byte[blockSize];
       this.numEntries = numEntries;
@@ -334,6 +335,14 @@ public class DataFileStream<D> implement
     int getBlockSize() {
       return blockSize;
     }
+
+    boolean isFlushOnWrite() {
+      return flushOnWrite;
+    }
+
+    void setFlushOnWrite(boolean flushOnWrite) {
+      this.flushOnWrite = flushOnWrite;
+    }
     
     ByteBuffer getAsByteBuffer() {
       return ByteBuffer.wrap(data, offset, blockSize);
@@ -356,8 +365,10 @@ public class DataFileStream<D> implement
       e.writeLong(this.blockSize);
       e.writeFixed(this.data, offset, this.blockSize);
       e.writeFixed(sync);
-      e.flush();
-    }   
+      if (flushOnWrite) {
+        e.flush();
+      }
+    }
     
   }
 }

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java?rev=1535888&r1=1535887&r2=1535888&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java Fri Oct 25 22:13:43 2013
@@ -70,6 +70,8 @@ public class DataFileWriter<D> implement
   private boolean isOpen;
   private Codec codec;
 
+  private boolean flushOnEveryBlock = true;
+
   /** Construct a writer, not yet open. */
   public DataFileWriter(DatumWriter<D> dout) {
     this.dout = dout;
@@ -97,6 +99,14 @@ public class DataFileWriter<D> implement
    * Set the synchronization interval for this file, in bytes. 
    * Valid values range from 32 to 2^30
    * Suggested values are between 2K and 2M
+   *
+   * The stream is flushed by default at the end of each synchronization
+   * interval.
+   *
+   * If {@linkplain #setFlushOnEveryBlock(boolean)} is
+   * called with param set to false, then the block may not be flushed to the
+   * stream after the sync marker is written. In this case,
+   * the {@linkplain #flush()} must be called to flush the stream.
    * 
    * Invalid values throw IllegalArgumentException
    * 
@@ -144,6 +154,27 @@ public class DataFileWriter<D> implement
     return this;
   }
 
+  /**
+   * Set whether this writer should flush the block to the stream every time
+   * a sync marker is written. By default, the writer will flush the buffer
+   * each time a sync marker is written (if the block size limit is reached
+   * or the {@linkplain #sync()} is called.
+   * @param flushOnEveryBlock - If set to false, this writer will not flush
+   *                          the block to the stream until {@linkplain
+   *                          #flush()} is explicitly called.
+   */
+  public void setFlushOnEveryBlock(boolean flushOnEveryBlock) {
+    this.flushOnEveryBlock = flushOnEveryBlock;
+  }
+
+  /**
+   * @return - true if this writer flushes the block to the stream every time
+   * a sync marker is written. Else returns false.
+   */
+  public boolean isFlushOnEveryBlock() {
+    return this.flushOnEveryBlock;
+  }
+
   /** Open a writer appending to an existing file. */
   public DataFileWriter<D> appendTo(File file) throws IOException {
     return appendTo(new SeekableFileInput(file),
@@ -345,6 +376,7 @@ public class DataFileWriter<D> implement
       bufOut.flush();
       ByteBuffer uncompressed = buffer.getByteArrayAsByteBuffer();
       DataBlock block = new DataBlock(uncompressed, blockCount);
+      block.setFlushOnWrite(flushOnEveryBlock);
       block.compressUsing(codec);
       block.writeBlockTo(vout, sync);
       buffer.reset();
@@ -354,21 +386,30 @@ public class DataFileWriter<D> implement
 
   /** Return the current position as a value that may be passed to {@link
    * DataFileReader#seek(long)}.  Forces the end of the current block,
-   * emitting a synchronization marker. */
+   * emitting a synchronization marker. By default, this will also flush the
+   * block to the stream.
+   *
+   * If {@linkplain #setFlushOnEveryBlock(boolean)} is
+   * called with param set to false, then this method may not flush
+   * the block. In this case, the {@linkplain #flush()} must be called to
+   * flush the stream.
+   */
   public long sync() throws IOException {
     assertOpen();
     writeBlock();
     return out.tell();
   }
 
-  /** Flush the current state of the file. */
+  /** Calls {@linkplain #sync()} and then flushes the current state of the
+   * file.
+   */
   @Override
   public void flush() throws IOException {
     sync();
     vout.flush();
   }
 
-  /** Close the file. */
+  /** Flush and close the file. */
   @Override
   public void close() throws IOException {
     if (isOpen) {

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java?rev=1535888&r1=1535887&r2=1535888&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java Fri Oct 25 22:13:43 2013
@@ -20,6 +20,7 @@ package org.apache.avro;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -282,6 +283,38 @@ public class TestDataFile {
              new GenericDatumReader<Object>());
   }
 
+  @Test
+  public void testFlushCount() throws IOException {
+    DataFileWriter<Object> writer =
+      new DataFileWriter<Object>(new GenericDatumWriter<Object>());
+    writer.setFlushOnEveryBlock(false);
+    TestingByteArrayOutputStream out = new TestingByteArrayOutputStream();
+    writer.create(SCHEMA, out);
+    int currentCount = 0;
+    int flushCounter = 0;
+    try {
+      for (Object datum : new RandomData(SCHEMA, COUNT, SEED+1)) {
+        currentCount++;
+        writer.append(datum);
+        writer.sync();
+        if (currentCount % 10 == 0) {
+          flushCounter++;
+          writer.flush();
+        }
+      }
+    } finally {
+      writer.close();
+    }
+    System.out.println("Total number of flushes: " + out.flushCount);
+    // Unfortunately, the underlying buffered output stream might flush data
+    // to disk when the buffer becomes full, so the only check we can
+    // accurately do is that each sync did not lead to a flush and that the
+    // file was flushed at least as many times as we called flush. Generally
+    // noticed that out.flushCount is almost always 24 though.
+    Assert.assertTrue(out.flushCount < currentCount &&
+      out.flushCount >= flushCounter);
+  }
+
   static void readFile(File f, DatumReader<? extends Object> datumReader)
     throws IOException {
     FileReader<? extends Object> reader = DataFileReader.openReader(f, datumReader);
@@ -301,4 +334,14 @@ public class TestDataFile {
       TestDataFile.readFile(input, new GenericDatumReader<Object>(null, projection));
     System.out.println("Time: "+(System.currentTimeMillis()-start));
   }
+
+  private class TestingByteArrayOutputStream extends ByteArrayOutputStream {
+    private int flushCount = 0;
+
+    @Override
+    public void flush() throws IOException {
+      super.flush();
+      flushCount++;
+    }
+  }
 }