You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/04/07 14:28:47 UTC

nifi git commit: NIFI-3678: Ensure that we catch EOFException when reading header information from WAL Partition files; previously, we caught EOFExceptions when reading a 'record' from the WAL but not when reading header info

Repository: nifi
Updated Branches:
  refs/heads/master 6a75ab174 -> 292dd1d66


NIFI-3678: Ensure that we catch EOFException when reading header information from WAL Partition files; previously, we caught EOFExceptions when reading a 'record' from the WAL but not when reading header info

NIFI-3678: If we have a transaction ID but then have no more data written to Partition file, we end up with a NPE. Added logic to avoid this and instead return null for the next record when this happens

This closes #1656.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/292dd1d6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/292dd1d6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/292dd1d6

Branch: refs/heads/master
Commit: 292dd1d66b1726794f0d34523578727ea3a7fe08
Parents: 6a75ab1
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Apr 6 15:57:11 2017 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri Apr 7 10:28:27 2017 -0400

----------------------------------------------------------------------
 .../org/wali/MinimalLockingWriteAheadLog.java   | 32 ++++----
 .../wali/TestMinimalLockingWriteAheadLog.java   | 82 ++++++++++++++++++++
 .../repository/SchemaRepositoryRecordSerde.java |  4 +
 3 files changed, 104 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/292dd1d6/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index 5334acb..8949073 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -973,24 +973,28 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
                 logger.debug("{} recovering from {}", this, nextRecoveryPath);
                 recoveryIn = createDataInputStream(nextRecoveryPath);
                 if (hasMoreData(recoveryIn)) {
-                    final String waliImplementationClass = recoveryIn.readUTF();
-                    if (!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) {
-                        continue;
-                    }
+                    try {
+                        final String waliImplementationClass = recoveryIn.readUTF();
+                        if (!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) {
+                            continue;
+                        }
 
-                    final long waliVersion = recoveryIn.readInt();
-                    if (waliVersion > writeAheadLogVersion) {
-                        throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using "
+                        final long waliVersion = recoveryIn.readInt();
+                        if (waliVersion > writeAheadLogVersion) {
+                            throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using "
                                 + "WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion);
-                    }
-
-                    final String serdeEncoding = recoveryIn.readUTF();
-                    this.recoveryVersion = recoveryIn.readInt();
-                    serde = serdeFactory.createSerDe(serdeEncoding);
+                        }
 
-                    serde.readHeader(recoveryIn);
+                        final String serdeEncoding = recoveryIn.readUTF();
+                        this.recoveryVersion = recoveryIn.readInt();
+                        serde = serdeFactory.createSerDe(serdeEncoding);
 
-                    break;
+                        serde.readHeader(recoveryIn);
+                        break;
+                    } catch (final Exception e) {
+                        logger.warn("Failed to recover data from Write-Ahead Log for {} because the header information could not be read properly. "
+                            + "This often is the result of the file not being fully written out before the application is restarted. This file will be ignored.", nextRecoveryPath);
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/292dd1d6/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
index 5cdad82..cbca968 100644
--- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
@@ -41,6 +42,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.Assert;
@@ -54,6 +56,86 @@ public class TestMinimalLockingWriteAheadLog {
 
 
     @Test
+    public void testTruncatedPartitionHeader() throws IOException {
+        final int numPartitions = 4;
+
+        final Path path = Paths.get("target/testTruncatedPartitionHeader");
+        deleteRecursively(path.toFile());
+        assertTrue(path.toFile().mkdirs());
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final SerDe<Object> serde = new SerDe<Object>() {
+            @Override
+            public void readHeader(DataInputStream in) throws IOException {
+                if (counter.getAndIncrement() == 1) {
+                    throw new EOFException("Intentionally thrown for unit test");
+                }
+            }
+
+            @Override
+            public void serializeEdit(Object previousRecordState, Object newRecordState, DataOutputStream out) throws IOException {
+                out.write(1);
+            }
+
+            @Override
+            public void serializeRecord(Object record, DataOutputStream out) throws IOException {
+                out.write(1);
+            }
+
+            @Override
+            public Object deserializeEdit(DataInputStream in, Map<Object, Object> currentRecordStates, int version) throws IOException {
+                final int val = in.read();
+                return (val == 1) ? new Object() : null;
+            }
+
+            @Override
+            public Object deserializeRecord(DataInputStream in, int version) throws IOException {
+                final int val = in.read();
+                return (val == 1) ? new Object() : null;
+            }
+
+            @Override
+            public Object getRecordIdentifier(Object record) {
+                return 1;
+            }
+
+            @Override
+            public UpdateType getUpdateType(Object record) {
+                return UpdateType.CREATE;
+            }
+
+            @Override
+            public String getLocation(Object record) {
+                return null;
+            }
+
+            @Override
+            public int getVersion() {
+                return 0;
+            }
+        };
+
+        final WriteAheadRepository<Object> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, (SyncListener) null);
+        try {
+            final Collection<Object> initialRecs = repo.recoverRecords();
+            assertTrue(initialRecs.isEmpty());
+
+            repo.update(Collections.singletonList(new Object()), false);
+            repo.update(Collections.singletonList(new Object()), false);
+            repo.update(Collections.singletonList(new Object()), false);
+        } finally {
+            repo.shutdown();
+        }
+
+        final WriteAheadRepository<Object> secondRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, (SyncListener) null);
+        try {
+            secondRepo.recoverRecords();
+        } finally {
+            secondRepo.shutdown();
+        }
+    }
+
+    @Test
     @Ignore("for local testing only")
     public void testUpdatePerformance() throws IOException, InterruptedException {
         final int numPartitions = 4;

http://git-wip-us.apache.org/repos/asf/nifi/blob/292dd1d6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
index 75f6ff2..221f8ce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
@@ -113,6 +113,10 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
     public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
         final SchemaRecordReader reader = SchemaRecordReader.fromSchema(recoverySchema);
         final Record updateRecord = reader.readRecord(in);
+        if (updateRecord == null) {
+            // null may be returned by reader.readRecord() if it encounters end-of-stream
+            return null;
+        }
 
         // Top level is always going to be a "Repository Record Update" record because we need a 'Union' type record at the
         // top level that indicates which type of record we have.