You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/02/29 21:43:11 UTC
[2/2] nifi git commit: NIFI-1574: Ensure that we never flush a
BufferedOutputStream's buffer on close of the write-ahead log
NIFI-1574: Ensure that we never flush a BufferedOutputStream's buffer on close of the write-ahead log
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/62333c9e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/62333c9e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/62333c9e
Branch: refs/heads/master
Commit: 62333c9e0ae77854761cf019a65f54ccedc7af6b
Parents: 1149bc6
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Feb 28 10:08:28 2016 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 29 15:42:54 2016 -0500
----------------------------------------------------------------------
.../org/wali/MinimalLockingWriteAheadLog.java | 58 +++---
.../wali/TestMinimalLockingWriteAheadLog.java | 178 +++++++++++++++++++
2 files changed, 208 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/62333c9e/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index 501c330..f20f917 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -689,17 +689,43 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
}
public void close() {
- final DataOutputStream out = dataOut;
+ // Note that here we are closing fileOut and NOT dataOut.
+ // This is very much intentional, not an oversight. This is done because of
+ // the way that the OutputStreams are structured. dataOut wraps a BufferedOutputStream,
+ // which then wraps the FileOutputStream. If we close 'dataOut', then this will call
+ // the flush() method of BufferedOutputStream. Under normal conditions, this is fine.
+ // However, there is a very important corner case to consider:
+ //
+ // If we are writing to the DataOutputStream in the update() method and that
+ // call to write() then results in the BufferedOutputStream calling flushBuffer() -
+ // or if we finish the call to update() and call flush() ourselves - it is possible
+ // that the internal buffer of the BufferedOutputStream can get partially written to
+ // to the FileOutputStream and then an IOException occurs. If this occurs, we have
+ // written a partial record to disk. This still is okay, as we have logic to handle
+ // the condition where we have a partial record and then an unexpected End-of-File.
+ // But if we then call close() on 'dataOut', this will call the flush() method of the
+ // underlying BufferedOutputStream. As a result, we will end up again writing the internal
+ // buffer of the BufferedOutputStream to the underlying file. At this point, we are left
+ // not with an unexpected/premature End-of-File but instead a bunch of seemingly random
+ // bytes that happened to be residing in that internal buffer, and this will result in
+ // a corrupt and unrecoverable Write-Ahead Log.
+ //
+ // Additionally, we are okay not ever calling close on the wrapping BufferedOutputStream and
+ // DataOutputStream because they don't actually hold any resources that need to be reclaimed,
+ // and after each update to the Write-Ahead Log, we call flush() ourselves to ensure that we don't
+ // leave arbitrary data in the BufferedOutputStream that hasn't been flushed to the underlying
+ // FileOutputStream.
+ final OutputStream out = fileOut;
if (out != null) {
try {
out.close();
} catch (final Exception e) {
-
}
}
this.closed = true;
this.dataOut = null;
+ this.fileOut = null;
}
public void blackList() {
@@ -721,32 +747,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
public void rollover() throws IOException {
lock.lock();
try {
- // Note that here we are closing fileOut and NOT dataOut.
- // This is very much intentional, not an oversight. This is done because of
- // the way that the OutputStreams are structured. dataOut wraps a BufferedOutputStream,
- // which then wraps the FileOutputStream. If we close 'dataOut', then this will call
- // the flush() method of BufferedOutputStream. Under normal conditions, this is fine.
- // However, there is a very important corner case to consider:
- //
- // If we are writing to the DataOutputStream in the update() method and that
- // call to write() then results in the BufferedOutputStream calling flushBuffer() -
- // or if we finish the call to update() and call flush() ourselves - it is possible
- // that the internal buffer of the BufferedOutputStream can get partially written to
- // to the FileOutputStream and then an IOException occurs. If this occurs, we have
- // written a partial record to disk. This still is okay, as we have logic to handle
- // the condition where we have a partial record and then an unexpected End-of-File.
- // But if we then call close() on 'dataOut', this will call the flush() method of the
- // underlying BufferedOutputStream. As a result, we will end up again writing the internal
- // buffer of the BufferedOutputStream to the underlying file. At this point, we are left
- // not with an unexpected/premature End-of-File but instead a bunch of seemingly random
- // bytes that happened to be residing in that internal buffer, and this will result in
- // a corrupt and unrecoverable Write-Ahead Log.
- //
- // Additionally, we are okay not ever calling close on the wrapping BufferedOutputStream and
- // DataOutputStream because they don't actually hold any resources that need to be reclaimed,
- // and after each update to the Write-Ahead Log, we call flush() ourselves to ensure that we don't
- // leave arbitrary data in the BufferedOutputStream that hasn't been flushed to the underlying
- // FileOutputStream.
+ // Note that here we are closing fileOut and NOT dataOut. See the note in the close()
+ // method to understand the logic behind this.
final OutputStream out = fileOut;
if (out != null) {
try {
http://git-wip-us.apache.org/repos/asf/nifi/blob/62333c9e/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
index 03e6581..7b7d2ca 100644
--- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
@@ -20,9 +20,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileFilter;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -417,6 +422,162 @@ public class TestMinimalLockingWriteAheadLog {
}
+ @Test
+ public void testShutdownWhileBlacklisted() throws IOException {
+ final Path path = Paths.get("target/minimal-locking-repo-shutdown-blacklisted");
+ deleteRecursively(path.toFile());
+ Files.createDirectories(path);
+
+ final SerDe<SimpleRecord> failOnThirdWriteSerde = new SerDe<SimpleRecord>() {
+ private int writes = 0;
+
+ @Override
+ public void serializeEdit(SimpleRecord previousRecordState, SimpleRecord newRecordState, DataOutputStream out) throws IOException {
+ serializeRecord(newRecordState, out);
+ }
+
+ @Override
+ public void serializeRecord(SimpleRecord record, DataOutputStream out) throws IOException {
+ int size = (int) record.getSize();
+ out.writeLong(record.getSize());
+
+ for (int i = 0; i < size; i++) {
+ out.write('A');
+ }
+
+ if (++writes == 3) {
+ throw new IOException("Intentional Exception for Unit Testing");
+ }
+
+ out.writeLong(record.getId());
+ }
+
+ @Override
+ public SimpleRecord deserializeEdit(DataInputStream in, Map<Object, SimpleRecord> currentRecordStates, int version) throws IOException {
+ return deserializeRecord(in, version);
+ }
+
+ @Override
+ public SimpleRecord deserializeRecord(DataInputStream in, int version) throws IOException {
+ long size = in.readLong();
+
+ for (int i = 0; i < (int) size; i++) {
+ in.read();
+ }
+
+ long id = in.readLong();
+ return new SimpleRecord(id, size);
+ }
+
+ @Override
+ public Object getRecordIdentifier(SimpleRecord record) {
+ return record.getId();
+ }
+
+ @Override
+ public UpdateType getUpdateType(SimpleRecord record) {
+ return UpdateType.CREATE;
+ }
+
+ @Override
+ public String getLocation(SimpleRecord record) {
+ return null;
+ }
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+ };
+
+ final WriteAheadRepository<SimpleRecord> writeRepo = new MinimalLockingWriteAheadLog<>(path, 1, failOnThirdWriteSerde, null);
+ final Collection<SimpleRecord> initialRecs = writeRepo.recoverRecords();
+ assertTrue(initialRecs.isEmpty());
+
+
+ writeRepo.update(Collections.singleton(new SimpleRecord(1L, 1L)), false);
+ writeRepo.update(Collections.singleton(new SimpleRecord(2L, 2L)), false);
+ try {
+ // Use a size of 8194 because the BufferedOutputStream has a buffer size of 8192 and we want
+ // to exceed this for testing purposes.
+ writeRepo.update(Collections.singleton(new SimpleRecord(3L, 8194L)), false);
+ Assert.fail("Expected IOException but did not get it");
+ } catch (final IOException ioe) {
+ // expected behavior
+ }
+
+ final Path partitionDir = path.resolve("partition-0");
+ final File journalFile = partitionDir.toFile().listFiles()[0];
+ final long journalFileSize = journalFile.length();
+ verifyBlacklistedJournalContents(journalFile, failOnThirdWriteSerde);
+
+ writeRepo.shutdown();
+
+ // Ensure that calling shutdown() didn't write anything to the journal file
+ final long newJournalSize = journalFile.length();
+ assertEquals("Calling Shutdown wrote " + (newJournalSize - journalFileSize) + " bytes to the journal file", newJournalSize, journalFile.length());
+ }
+
+ private void verifyBlacklistedJournalContents(final File journalFile, final SerDe<?> serde) throws IOException {
+ try (final FileInputStream fis = new FileInputStream(journalFile);
+ final InputStream bis = new BufferedInputStream(fis);
+ final DataInputStream in = new DataInputStream(bis)) {
+
+ // Verify header info.
+ final String waliClassName = in.readUTF();
+ assertEquals(MinimalLockingWriteAheadLog.class.getName(), waliClassName);
+
+ final int waliVersion = in.readInt();
+ assertTrue(waliVersion > 0);
+
+ final String serdeClassName = in.readUTF();
+ assertEquals(serde.getClass().getName(), serdeClassName);
+
+ final int serdeVersion = in.readInt();
+ assertEquals(serde.getVersion(), serdeVersion);
+
+ for (int i = 0; i < 2; i++) {
+ long transactionId = in.readLong();
+ assertEquals(i, transactionId);
+
+ // read what serde wrote
+ long size = in.readLong();
+
+ assertEquals((i + 1), size);
+
+ for (int j = 0; j < (int) size; j++) {
+ final int c = in.read();
+ assertEquals('A', c);
+ }
+
+ long id = in.readLong();
+ assertEquals((i + 1), id);
+
+ int transactionIndicator = in.read();
+ assertEquals(2, transactionIndicator);
+ }
+
+ long transactionId = in.readLong();
+ assertEquals(2L, transactionId);
+
+ long thirdSize = in.readLong();
+ assertEquals(8194, thirdSize);
+
+ // should be 8176 A's because we threw an Exception after writing 8194 of them,
+ // but the BufferedOutputStream's buffer already had 8 bytes on it for the
+ // transaction id and the size.
+ for (int i = 0; i < 8176; i++) {
+ final int c = in.read();
+ assertEquals("i = " + i, 'A', c);
+ }
+
+ // Stream should now be out of data, because we threw an Exception!
+ final int nextByte = in.read();
+ assertEquals(-1, nextByte);
+ }
+ }
+
+
@Test
public void testDecreaseNumberOfPartitions() throws IOException {
@@ -544,4 +705,21 @@ public class TestMinimalLockingWriteAheadLog {
return size;
}
+ static class SimpleRecord {
+ private long id;
+ private long size;
+
+ public SimpleRecord(final long id, final long size) {
+ this.id = id;
+ this.size = size;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public long getSize() {
+ return size;
+ }
+ }
}