You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/04/07 14:28:47 UTC
nifi git commit: NIFI-3678: Ensure that we catch EOFException when
reading header information from WAL Partition files;
previously, we caught EOFExceptions when reading a 'record' from the WAL but
not when reading header info
Repository: nifi
Updated Branches:
refs/heads/master 6a75ab174 -> 292dd1d66
NIFI-3678: Ensure that we catch EOFException when reading header information from WAL Partition files; previously, we caught EOFExceptions when reading a 'record' from the WAL but not when reading header info
NIFI-3678: If we have a transaction ID but then have no more data written to Partition file, we end up with a NPE. Added logic to avoid this and instead return null for the next record when this happens
This closes #1656.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/292dd1d6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/292dd1d6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/292dd1d6
Branch: refs/heads/master
Commit: 292dd1d66b1726794f0d34523578727ea3a7fe08
Parents: 6a75ab1
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Apr 6 15:57:11 2017 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri Apr 7 10:28:27 2017 -0400
----------------------------------------------------------------------
.../org/wali/MinimalLockingWriteAheadLog.java | 32 ++++----
.../wali/TestMinimalLockingWriteAheadLog.java | 82 ++++++++++++++++++++
.../repository/SchemaRepositoryRecordSerde.java | 4 +
3 files changed, 104 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/292dd1d6/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index 5334acb..8949073 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -973,24 +973,28 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
logger.debug("{} recovering from {}", this, nextRecoveryPath);
recoveryIn = createDataInputStream(nextRecoveryPath);
if (hasMoreData(recoveryIn)) {
- final String waliImplementationClass = recoveryIn.readUTF();
- if (!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) {
- continue;
- }
+ try {
+ final String waliImplementationClass = recoveryIn.readUTF();
+ if (!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) {
+ continue;
+ }
- final long waliVersion = recoveryIn.readInt();
- if (waliVersion > writeAheadLogVersion) {
- throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using "
+ final long waliVersion = recoveryIn.readInt();
+ if (waliVersion > writeAheadLogVersion) {
+ throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using "
+ "WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion);
- }
-
- final String serdeEncoding = recoveryIn.readUTF();
- this.recoveryVersion = recoveryIn.readInt();
- serde = serdeFactory.createSerDe(serdeEncoding);
+ }
- serde.readHeader(recoveryIn);
+ final String serdeEncoding = recoveryIn.readUTF();
+ this.recoveryVersion = recoveryIn.readInt();
+ serde = serdeFactory.createSerDe(serdeEncoding);
- break;
+ serde.readHeader(recoveryIn);
+ break;
+ } catch (final Exception e) {
+ logger.warn("Failed to recover data from Write-Ahead Log for {} because the header information could not be read properly. "
+ + "This often is the result of the file not being fully written out before the application is restarted. This file will be ignored.", nextRecoveryPath);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/292dd1d6/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
index 5cdad82..cbca968 100644
--- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
@@ -41,6 +42,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
@@ -54,6 +56,86 @@ public class TestMinimalLockingWriteAheadLog {
@Test
+ public void testTruncatedPartitionHeader() throws IOException {
+ final int numPartitions = 4;
+
+ final Path path = Paths.get("target/testTruncatedPartitionHeader");
+ deleteRecursively(path.toFile());
+ assertTrue(path.toFile().mkdirs());
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final SerDe<Object> serde = new SerDe<Object>() {
+ @Override
+ public void readHeader(DataInputStream in) throws IOException {
+ if (counter.getAndIncrement() == 1) {
+ throw new EOFException("Intentionally thrown for unit test");
+ }
+ }
+
+ @Override
+ public void serializeEdit(Object previousRecordState, Object newRecordState, DataOutputStream out) throws IOException {
+ out.write(1);
+ }
+
+ @Override
+ public void serializeRecord(Object record, DataOutputStream out) throws IOException {
+ out.write(1);
+ }
+
+ @Override
+ public Object deserializeEdit(DataInputStream in, Map<Object, Object> currentRecordStates, int version) throws IOException {
+ final int val = in.read();
+ return (val == 1) ? new Object() : null;
+ }
+
+ @Override
+ public Object deserializeRecord(DataInputStream in, int version) throws IOException {
+ final int val = in.read();
+ return (val == 1) ? new Object() : null;
+ }
+
+ @Override
+ public Object getRecordIdentifier(Object record) {
+ return 1;
+ }
+
+ @Override
+ public UpdateType getUpdateType(Object record) {
+ return UpdateType.CREATE;
+ }
+
+ @Override
+ public String getLocation(Object record) {
+ return null;
+ }
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+ };
+
+ final WriteAheadRepository<Object> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, (SyncListener) null);
+ try {
+ final Collection<Object> initialRecs = repo.recoverRecords();
+ assertTrue(initialRecs.isEmpty());
+
+ repo.update(Collections.singletonList(new Object()), false);
+ repo.update(Collections.singletonList(new Object()), false);
+ repo.update(Collections.singletonList(new Object()), false);
+ } finally {
+ repo.shutdown();
+ }
+
+ final WriteAheadRepository<Object> secondRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, (SyncListener) null);
+ try {
+ secondRepo.recoverRecords();
+ } finally {
+ secondRepo.shutdown();
+ }
+ }
+
+ @Test
@Ignore("for local testing only")
public void testUpdatePerformance() throws IOException, InterruptedException {
final int numPartitions = 4;
http://git-wip-us.apache.org/repos/asf/nifi/blob/292dd1d6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
index 75f6ff2..221f8ce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
@@ -113,6 +113,10 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
final SchemaRecordReader reader = SchemaRecordReader.fromSchema(recoverySchema);
final Record updateRecord = reader.readRecord(in);
+ if (updateRecord == null) {
+ // null may be returned by reader.readRecord() if it encounters end-of-stream
+ return null;
+ }
// Top level is always going to be a "Repository Record Update" record because we need a 'Union' type record at the
// top level that indicates which type of record we have.