You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/10/26 00:13:43 UTC
svn commit: r1535888 - in /avro/trunk: CHANGES.txt
lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
Author: cutting
Date: Fri Oct 25 22:13:43 2013
New Revision: 1535888
URL: http://svn.apache.org/r1535888
Log:
AVRO-1387. Java: Add DataFileWriter option to inhibit flush per block. Contributed by Hari Shreedharan.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1535888&r1=1535887&r2=1535888&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Oct 25 22:13:43 2013
@@ -14,6 +14,9 @@ Trunk (not yet released)
AVRO-1332. C#: Improve DatumReader performance.
(David McIntosh via cutting)
+ AVRO-1387. Java: Add DataFileWriter option to inhibit flush per block.
+ (Hari Shreedharan via cutting)
+
BUG FIXES
AVRO-1368. Fix SpecificDatumWriter to, when writing a string
Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java?rev=1535888&r1=1535887&r2=1535888&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java Fri Oct 25 22:13:43 2013
@@ -310,6 +310,7 @@ public class DataFileStream<D> implement
private long numEntries;
private int blockSize;
private int offset = 0;
+ private boolean flushOnWrite = true;
private DataBlock(long numEntries, int blockSize) {
this.data = new byte[blockSize];
this.numEntries = numEntries;
@@ -334,6 +335,14 @@ public class DataFileStream<D> implement
int getBlockSize() {
return blockSize;
}
+
+ boolean isFlushOnWrite() {
+ return flushOnWrite;
+ }
+
+ void setFlushOnWrite(boolean flushOnWrite) {
+ this.flushOnWrite = flushOnWrite;
+ }
ByteBuffer getAsByteBuffer() {
return ByteBuffer.wrap(data, offset, blockSize);
@@ -356,8 +365,10 @@ public class DataFileStream<D> implement
e.writeLong(this.blockSize);
e.writeFixed(this.data, offset, this.blockSize);
e.writeFixed(sync);
- e.flush();
- }
+ if (flushOnWrite) {
+ e.flush();
+ }
+ }
}
}
Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java?rev=1535888&r1=1535887&r2=1535888&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java Fri Oct 25 22:13:43 2013
@@ -70,6 +70,8 @@ public class DataFileWriter<D> implement
private boolean isOpen;
private Codec codec;
+ private boolean flushOnEveryBlock = true;
+
/** Construct a writer, not yet open. */
public DataFileWriter(DatumWriter<D> dout) {
this.dout = dout;
@@ -97,6 +99,14 @@ public class DataFileWriter<D> implement
* Set the synchronization interval for this file, in bytes.
* Valid values range from 32 to 2^30
* Suggested values are between 2K and 2M
+ *
+ * The stream is flushed by default at the end of each synchronization
+ * interval.
+ *
+ * If {@linkplain #setFlushOnEveryBlock(boolean)} is
+ * called with param set to false, then the block may not be flushed to the
+ * stream after the sync marker is written. In this case,
+ * the {@linkplain #flush()} must be called to flush the stream.
*
* Invalid values throw IllegalArgumentException
*
@@ -144,6 +154,27 @@ public class DataFileWriter<D> implement
return this;
}
+ /**
+ * Set whether this writer should flush the block to the stream every time
+ * a sync marker is written. By default, the writer will flush the buffer
+ * each time a sync marker is written (if the block size limit is reached
+ * or the {@linkplain #sync()} is called.
+ * @param flushOnEveryBlock - If set to false, this writer will not flush
+ * the block to the stream until {@linkplain
+ * #flush()} is explicitly called.
+ */
+ public void setFlushOnEveryBlock(boolean flushOnEveryBlock) {
+ this.flushOnEveryBlock = flushOnEveryBlock;
+ }
+
+ /**
+ * @return - true if this writer flushes the block to the stream every time
+ * a sync marker is written. Else returns false.
+ */
+ public boolean isFlushOnEveryBlock() {
+ return this.flushOnEveryBlock;
+ }
+
/** Open a writer appending to an existing file. */
public DataFileWriter<D> appendTo(File file) throws IOException {
return appendTo(new SeekableFileInput(file),
@@ -345,6 +376,7 @@ public class DataFileWriter<D> implement
bufOut.flush();
ByteBuffer uncompressed = buffer.getByteArrayAsByteBuffer();
DataBlock block = new DataBlock(uncompressed, blockCount);
+ block.setFlushOnWrite(flushOnEveryBlock);
block.compressUsing(codec);
block.writeBlockTo(vout, sync);
buffer.reset();
@@ -354,21 +386,30 @@ public class DataFileWriter<D> implement
/** Return the current position as a value that may be passed to {@link
* DataFileReader#seek(long)}. Forces the end of the current block,
- * emitting a synchronization marker. */
+ * emitting a synchronization marker. By default, this will also flush the
+ * block to the stream.
+ *
+ * If {@linkplain #setFlushOnEveryBlock(boolean)} is
+ * called with param set to false, then this method may not flush
+ * the block. In this case, the {@linkplain #flush()} must be called to
+ * flush the stream.
+ */
public long sync() throws IOException {
assertOpen();
writeBlock();
return out.tell();
}
- /** Flush the current state of the file. */
+ /** Calls {@linkplain #sync()} and then flushes the current state of the
+ * file.
+ */
@Override
public void flush() throws IOException {
sync();
vout.flush();
}
- /** Close the file. */
+ /** Flush and close the file. */
@Override
public void close() throws IOException {
if (isOpen) {
Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java?rev=1535888&r1=1535887&r2=1535888&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java Fri Oct 25 22:13:43 2013
@@ -20,6 +20,7 @@ package org.apache.avro;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -282,6 +283,38 @@ public class TestDataFile {
new GenericDatumReader<Object>());
}
+ @Test
+ public void testFlushCount() throws IOException {
+ DataFileWriter<Object> writer =
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>());
+ writer.setFlushOnEveryBlock(false);
+ TestingByteArrayOutputStream out = new TestingByteArrayOutputStream();
+ writer.create(SCHEMA, out);
+ int currentCount = 0;
+ int flushCounter = 0;
+ try {
+ for (Object datum : new RandomData(SCHEMA, COUNT, SEED+1)) {
+ currentCount++;
+ writer.append(datum);
+ writer.sync();
+ if (currentCount % 10 == 0) {
+ flushCounter++;
+ writer.flush();
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ System.out.println("Total number of flushes: " + out.flushCount);
+ // Unfortunately, the underlying buffered output stream might flush data
+ // to disk when the buffer becomes full, so the only check we can
+ // accurately do is that each sync did not lead to a flush and that the
+ // file was flushed at least as many times as we called flush. Generally
+ // noticed that out.flushCount is almost always 24 though.
+ Assert.assertTrue(out.flushCount < currentCount &&
+ out.flushCount >= flushCounter);
+ }
+
static void readFile(File f, DatumReader<? extends Object> datumReader)
throws IOException {
FileReader<? extends Object> reader = DataFileReader.openReader(f, datumReader);
@@ -301,4 +334,14 @@ public class TestDataFile {
TestDataFile.readFile(input, new GenericDatumReader<Object>(null, projection));
System.out.println("Time: "+(System.currentTimeMillis()-start));
}
+
+ private class TestingByteArrayOutputStream extends ByteArrayOutputStream {
+ private int flushCount = 0;
+
+ @Override
+ public void flush() throws IOException {
+ super.flush();
+ flushCount++;
+ }
+ }
}