You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/02/29 21:43:11 UTC

[2/2] nifi git commit: NIFI-1574: Ensure that we never flush a BufferedOutputStream's buffer on close of the write-ahead log

NIFI-1574: Ensure that we never flush a BufferedOutputStream's buffer on close of the write-ahead log


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/62333c9e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/62333c9e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/62333c9e

Branch: refs/heads/master
Commit: 62333c9e0ae77854761cf019a65f54ccedc7af6b
Parents: 1149bc6
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Feb 28 10:08:28 2016 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 29 15:42:54 2016 -0500

----------------------------------------------------------------------
 .../org/wali/MinimalLockingWriteAheadLog.java   |  58 +++---
 .../wali/TestMinimalLockingWriteAheadLog.java   | 178 +++++++++++++++++++
 2 files changed, 208 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/62333c9e/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index 501c330..f20f917 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -689,17 +689,43 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
         }
 
         public void close() {
-            final DataOutputStream out = dataOut;
+            // Note that here we are closing fileOut and NOT dataOut.
+            // This is very much intentional, not an oversight. This is done because of
+            // the way that the OutputStreams are structured. dataOut wraps a BufferedOutputStream,
+            // which then wraps the FileOutputStream. If we close 'dataOut', then this will call
+            // the flush() method of BufferedOutputStream. Under normal conditions, this is fine.
+            // However, there is a very important corner case to consider:
+            //
+            //      If we are writing to the DataOutputStream in the update() method and that
+            //      call to write() then results in the BufferedOutputStream calling flushBuffer() -
+            //      or if we finish the call to update() and call flush() ourselves - it is possible
+            //      that the internal buffer of the BufferedOutputStream can get partially written to
+            //      to the FileOutputStream and then an IOException occurs. If this occurs, we have
+            //      written a partial record to disk. This still is okay, as we have logic to handle
+            //      the condition where we have a partial record and then an unexpected End-of-File.
+            //      But if we then call close() on 'dataOut', this will call the flush() method of the
+            //      underlying BufferedOutputStream. As a result, we will end up again writing the internal
+            //      buffer of the BufferedOutputStream to the underlying file. At this point, we are left
+            //      not with an unexpected/premature End-of-File but instead a bunch of seemingly random
+            //      bytes that happened to be residing in that internal buffer, and this will result in
+            //      a corrupt and unrecoverable Write-Ahead Log.
+            //
+            // Additionally, we are okay not ever calling close on the wrapping BufferedOutputStream and
+            // DataOutputStream because they don't actually hold any resources that need to be reclaimed,
+            // and after each update to the Write-Ahead Log, we call flush() ourselves to ensure that we don't
+            // leave arbitrary data in the BufferedOutputStream that hasn't been flushed to the underlying
+            // FileOutputStream.
+            final OutputStream out = fileOut;
             if (out != null) {
                 try {
                     out.close();
                 } catch (final Exception e) {
-
                 }
             }
 
             this.closed = true;
             this.dataOut = null;
+            this.fileOut = null;
         }
 
         public void blackList() {
@@ -721,32 +747,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
         public void rollover() throws IOException {
             lock.lock();
             try {
-                // Note that here we are closing fileOut and NOT dataOut.
-                // This is very much intentional, not an oversight. This is done because of
-                // the way that the OutputStreams are structured. dataOut wraps a BufferedOutputStream,
-                // which then wraps the FileOutputStream. If we close 'dataOut', then this will call
-                // the flush() method of BufferedOutputStream. Under normal conditions, this is fine.
-                // However, there is a very important corner case to consider:
-                //
-                //      If we are writing to the DataOutputStream in the update() method and that
-                //      call to write() then results in the BufferedOutputStream calling flushBuffer() -
-                //      or if we finish the call to update() and call flush() ourselves - it is possible
-                //      that the internal buffer of the BufferedOutputStream can get partially written to
-                //      to the FileOutputStream and then an IOException occurs. If this occurs, we have
-                //      written a partial record to disk. This still is okay, as we have logic to handle
-                //      the condition where we have a partial record and then an unexpected End-of-File.
-                //      But if we then call close() on 'dataOut', this will call the flush() method of the
-                //      underlying BufferedOutputStream. As a result, we will end up again writing the internal
-                //      buffer of the BufferedOutputStream to the underlying file. At this point, we are left
-                //      not with an unexpected/premature End-of-File but instead a bunch of seemingly random
-                //      bytes that happened to be residing in that internal buffer, and this will result in
-                //      a corrupt and unrecoverable Write-Ahead Log.
-                //
-                // Additionally, we are okay not ever calling close on the wrapping BufferedOutputStream and
-                // DataOutputStream because they don't actually hold any resources that need to be reclaimed,
-                // and after each update to the Write-Ahead Log, we call flush() ourselves to ensure that we don't
-                // leave arbitrary data in the BufferedOutputStream that hasn't been flushed to the underlying
-                // FileOutputStream.
+                // Note that here we are closing fileOut and NOT dataOut. See the note in the close()
+                // method to understand the logic behind this.
                 final OutputStream out = fileOut;
                 if (out != null) {
                     try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/62333c9e/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
index 03e6581..7b7d2ca 100644
--- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
@@ -20,9 +20,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileFilter;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -417,6 +422,162 @@ public class TestMinimalLockingWriteAheadLog {
 
     }
 
+    @Test
+    public void testShutdownWhileBlacklisted() throws IOException {
+        final Path path = Paths.get("target/minimal-locking-repo-shutdown-blacklisted");
+        deleteRecursively(path.toFile());
+        Files.createDirectories(path);
+
+        final SerDe<SimpleRecord> failOnThirdWriteSerde = new SerDe<SimpleRecord>() {
+            private int writes = 0;
+
+            @Override
+            public void serializeEdit(SimpleRecord previousRecordState, SimpleRecord newRecordState, DataOutputStream out) throws IOException {
+                serializeRecord(newRecordState, out);
+            }
+
+            @Override
+            public void serializeRecord(SimpleRecord record, DataOutputStream out) throws IOException {
+                int size = (int) record.getSize();
+                out.writeLong(record.getSize());
+
+                for (int i = 0; i < size; i++) {
+                    out.write('A');
+                }
+
+                if (++writes == 3) {
+                    throw new IOException("Intentional Exception for Unit Testing");
+                }
+
+                out.writeLong(record.getId());
+            }
+
+            @Override
+            public SimpleRecord deserializeEdit(DataInputStream in, Map<Object, SimpleRecord> currentRecordStates, int version) throws IOException {
+                return deserializeRecord(in, version);
+            }
+
+            @Override
+            public SimpleRecord deserializeRecord(DataInputStream in, int version) throws IOException {
+                long size = in.readLong();
+
+                for (int i = 0; i < (int) size; i++) {
+                    in.read();
+                }
+
+                long id = in.readLong();
+                return new SimpleRecord(id, size);
+            }
+
+            @Override
+            public Object getRecordIdentifier(SimpleRecord record) {
+                return record.getId();
+            }
+
+            @Override
+            public UpdateType getUpdateType(SimpleRecord record) {
+                return UpdateType.CREATE;
+            }
+
+            @Override
+            public String getLocation(SimpleRecord record) {
+                return null;
+            }
+
+            @Override
+            public int getVersion() {
+                return 0;
+            }
+        };
+
+        final WriteAheadRepository<SimpleRecord> writeRepo = new MinimalLockingWriteAheadLog<>(path, 1, failOnThirdWriteSerde, null);
+        final Collection<SimpleRecord> initialRecs = writeRepo.recoverRecords();
+        assertTrue(initialRecs.isEmpty());
+
+
+        writeRepo.update(Collections.singleton(new SimpleRecord(1L, 1L)), false);
+        writeRepo.update(Collections.singleton(new SimpleRecord(2L, 2L)), false);
+        try {
+            // Use a size of 8194 because the BufferedOutputStream has a buffer size of 8192 and we want
+            // to exceed this for testing purposes.
+            writeRepo.update(Collections.singleton(new SimpleRecord(3L, 8194L)), false);
+            Assert.fail("Expected IOException but did not get it");
+        } catch (final IOException ioe) {
+            // expected behavior
+        }
+
+        final Path partitionDir = path.resolve("partition-0");
+        final File journalFile = partitionDir.toFile().listFiles()[0];
+        final long journalFileSize = journalFile.length();
+        verifyBlacklistedJournalContents(journalFile, failOnThirdWriteSerde);
+
+        writeRepo.shutdown();
+
+        // Ensure that calling shutdown() didn't write anything to the journal file
+        final long newJournalSize = journalFile.length();
+        assertEquals("Calling Shutdown wrote " + (newJournalSize - journalFileSize) + " bytes to the journal file", newJournalSize, journalFile.length());
+    }
+
+    private void verifyBlacklistedJournalContents(final File journalFile, final SerDe<?> serde) throws IOException {
+        try (final FileInputStream fis = new FileInputStream(journalFile);
+            final InputStream bis = new BufferedInputStream(fis);
+            final DataInputStream in = new DataInputStream(bis)) {
+
+            // Verify header info.
+            final String waliClassName = in.readUTF();
+            assertEquals(MinimalLockingWriteAheadLog.class.getName(), waliClassName);
+
+            final int waliVersion = in.readInt();
+            assertTrue(waliVersion > 0);
+
+            final String serdeClassName = in.readUTF();
+            assertEquals(serde.getClass().getName(), serdeClassName);
+
+            final int serdeVersion = in.readInt();
+            assertEquals(serde.getVersion(), serdeVersion);
+
+            for (int i = 0; i < 2; i++) {
+                long transactionId = in.readLong();
+                assertEquals(i, transactionId);
+
+                // read what serde wrote
+                long size = in.readLong();
+
+                assertEquals((i + 1), size);
+
+                for (int j = 0; j < (int) size; j++) {
+                    final int c = in.read();
+                    assertEquals('A', c);
+                }
+
+                long id = in.readLong();
+                assertEquals((i + 1), id);
+
+                int transactionIndicator = in.read();
+                assertEquals(2, transactionIndicator);
+            }
+
+            long transactionId = in.readLong();
+            assertEquals(2L, transactionId);
+
+            long thirdSize = in.readLong();
+            assertEquals(8194, thirdSize);
+
+            // should be 8176 A's because we threw an Exception after writing 8194 of them,
+            // but the BufferedOutputStream's buffer already had 8 bytes on it for the
+            // transaction id and the size.
+            for (int i = 0; i < 8176; i++) {
+                final int c = in.read();
+                assertEquals("i = " + i, 'A', c);
+            }
+
+            // Stream should now be out of data, because we threw an Exception!
+            final int nextByte = in.read();
+            assertEquals(-1, nextByte);
+        }
+    }
+
+
 
     @Test
     public void testDecreaseNumberOfPartitions() throws IOException {
@@ -544,4 +705,21 @@ public class TestMinimalLockingWriteAheadLog {
         return size;
     }
 
+    static class SimpleRecord {
+        private long id;
+        private long size;
+
+        public SimpleRecord(final long id, final long size) {
+            this.id = id;
+            this.size = size;
+        }
+
+        public long getId() {
+            return id;
+        }
+
+        public long getSize() {
+            return size;
+        }
+    }
 }