You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/02/05 20:23:48 UTC

[nifi] branch master updated: NIFI-5997: Recover FlowFile Repository before swap files; then, when recovering swap files, ignore any that are unknown to the flowfile repo. This prevents us from incrementing the size of the flowfile queue for unknown swap files

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 412c490  NIFI-5997: Recover FlowFile Repository before swap files; then, when recovering swap files, ignore any that are unknown to the flowfile repo. This prevents us from incrementing the size of the flowfile queue for unknown swap files
412c490 is described below

commit 412c4908e2c5d79d958b09403c816db57c828179
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Feb 5 11:24:42 2019 -0500

    NIFI-5997: Recover FlowFile Repository before swap files; then, when recovering swap files, ignore any that are unknown to the flowfile repo. This prevents us from incrementing the size of the flowfile queue for unknown swap files
    
    This closes #3292.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../controller/repository/FlowFileRepository.java     | 11 ++++++++---
 .../apache/nifi/controller/FileSystemSwapManager.java |  6 ++++++
 .../org/apache/nifi/controller/FlowController.java    |  4 +++-
 .../repository/VolatileFlowFileRepository.java        | 18 ++++++++++++++++--
 .../repository/WriteAheadFlowFileRepository.java      | 19 +++++++++++++++++--
 .../repository/TestStandardProcessSession.java        |  6 +++++-
 .../repository/TestWriteAheadFlowFileRepository.java  | 14 +++++++-------
 7 files changed, 62 insertions(+), 16 deletions(-)

diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index b9ff249..ac6e68c 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -78,13 +78,11 @@ public interface FlowFileRepository extends Closeable {
      *
      * @param queueProvider the provider of FlowFile Queues into which the
      * FlowFiles should be enqueued
-     * @param minimumSequenceNumber specifies the minimum value that should be
-     * returned by a call to {@link #getNextFlowFileSequence()}
      *
      * @return index of highest flow file identifier
      * @throws IOException if load fails
      */
-    long loadFlowFiles(QueueProvider queueProvider, long minimumSequenceNumber) throws IOException;
+    long loadFlowFiles(QueueProvider queueProvider) throws IOException;
 
     /**
      * @return <code>true</code> if the Repository is volatile (i.e., its data
@@ -105,6 +103,13 @@ public interface FlowFileRepository extends Closeable {
     long getMaxFlowFileIdentifier() throws IOException;
 
     /**
+     * Notifies the FlowFile Repository that the given identifier has been identified as the maximum value that
+     * has been encountered for an 'external' (swapped out) FlowFile.
+     * @param maxId the max id of any FlowFile encountered
+     */
+    void updateMaxFlowFileIdentifier(long maxId);
+
+    /**
      * Updates the Repository to indicate that the given FlowFileRecords were
      * Swapped Out of memory
      *
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index b2717c2..8a0aa3b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -298,6 +298,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                     }
                 }
 
+                final boolean validLocation = flowFileRepository.isValidSwapLocationSuffix(swapFile.getName());
+                if (!validLocation) {
+                    logger.warn("Encountered unknown Swap File {}; will ignore this Swap File. This file should be cleaned up manually", swapFile);
+                    continue;
+                }
+
                 swapLocations.add(swapFile.getAbsolutePath());
                 continue;
             }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 8ab7e69..b731610 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -748,6 +748,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             // get all connections/queues and recover from swap files.
             final List<Connection> connections = flowManager.getRootGroup().findAllConnections();
 
+            flowFileRepository.loadFlowFiles(new StandardQueueProvider(this));
+
             long maxIdFromSwapFiles = -1L;
             if (flowFileRepository.isVolatile()) {
                 for (final Connection connection : connections) {
@@ -771,7 +773,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
                 }
             }
 
-            flowFileRepository.loadFlowFiles(new StandardQueueProvider(this), maxIdFromSwapFiles + 1);
+            flowFileRepository.updateMaxFlowFileIdentifier(maxIdFromSwapFiles + 1);
 
             // Begin expiring FlowFiles that are old
             final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository,
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index da714a6..979a22e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -114,12 +114,26 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
     }
 
     @Override
-    public long loadFlowFiles(final QueueProvider queueProvider, final long minimumSequenceNumber) throws IOException {
-        idGenerator.set(minimumSequenceNumber);
+    public long loadFlowFiles(final QueueProvider queueProvider) throws IOException {
         return 0;
     }
 
     @Override
+    public void updateMaxFlowFileIdentifier(final long maxId) {
+        while (true) {
+            final long currentId = idGenerator.get();
+            if (currentId >= maxId) {
+                return;
+            }
+
+            final boolean updated = idGenerator.compareAndSet(currentId, maxId);
+            if (updated) {
+                return;
+            }
+        }
+    }
+
+    @Override
     public long getNextFlowFileSequence() {
         return idGenerator.getAndIncrement();
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index d8e45f2..2fc2855 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -585,7 +585,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
     }
 
     @Override
-    public long loadFlowFiles(final QueueProvider queueProvider, final long minimumSequenceNumber) throws IOException {
+    public long loadFlowFiles(final QueueProvider queueProvider) throws IOException {
         final Map<String, FlowFileQueue> queueMap = new HashMap<>();
         for (final FlowFileQueue queue : queueProvider.getAllQueues()) {
             queueMap.put(queue.getIdentifier(), queue);
@@ -630,7 +630,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
 
         // Determine the next sequence number for FlowFiles
         int numFlowFilesMissingQueue = 0;
-        long maxId = minimumSequenceNumber;
+        long maxId = 0;
         for (final RepositoryRecord record : recordList) {
             final long recordId = serdeFactory.getRecordIdentifier(record);
             if (recordId > maxId) {
@@ -676,6 +676,21 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
     }
 
     @Override
+    public void updateMaxFlowFileIdentifier(final long maxId) {
+        while (true) {
+            final long currentId = flowFileSequenceGenerator.get();
+            if (currentId >= maxId) {
+                return;
+            }
+
+            final boolean updated = flowFileSequenceGenerator.compareAndSet(currentId, maxId);
+            if (updated) {
+                return;
+            }
+        }
+    }
+
+    @Override
     public long getNextFlowFileSequence() {
         return flowFileSequenceGenerator.getAndIncrement();
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 7cd2fd6..42e61d3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -2105,6 +2105,10 @@ public class TestStandardProcessSession {
         }
 
         @Override
+        public void updateMaxFlowFileIdentifier(final long maxId) {
+        }
+
+        @Override
         public void updateRepository(Collection<RepositoryRecord> records) throws IOException {
             if (failOnUpdate) {
                 throw new IOException("FlowFile Repository told to fail on update for unit test");
@@ -2137,7 +2141,7 @@ public class TestStandardProcessSession {
         }
 
         @Override
-        public long loadFlowFiles(QueueProvider queueProvider, long minimumSequenceNumber) throws IOException {
+        public long loadFlowFiles(QueueProvider queueProvider) throws IOException {
             return 0;
         }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 1761bd8..cd3ee1c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -422,7 +422,7 @@ public class TestWriteAheadFlowFileRepository {
         repo.initialize(new StandardResourceClaimManager());
 
         final TestQueueProvider queueProvider = new TestQueueProvider();
-        repo.loadFlowFiles(queueProvider, 0L);
+        repo.loadFlowFiles(queueProvider);
 
         final Connection connection = Mockito.mock(Connection.class);
         when(connection.getIdentifier()).thenReturn("1234");
@@ -449,7 +449,7 @@ public class TestWriteAheadFlowFileRepository {
         // restore
         final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null));
         repo2.initialize(new StandardResourceClaimManager());
-        repo2.loadFlowFiles(queueProvider, 0L);
+        repo2.loadFlowFiles(queueProvider);
         assertTrue(repo2.isValidSwapLocationSuffix("swap123"));
         assertFalse(repo2.isValidSwapLocationSuffix("other"));
         repo2.close();
@@ -466,7 +466,7 @@ public class TestWriteAheadFlowFileRepository {
         repo.initialize(new StandardResourceClaimManager());
 
         final TestQueueProvider queueProvider = new TestQueueProvider();
-        repo.loadFlowFiles(queueProvider, 0L);
+        repo.loadFlowFiles(queueProvider);
 
         final Connection connection = Mockito.mock(Connection.class);
         when(connection.getIdentifier()).thenReturn("1234");
@@ -519,7 +519,7 @@ public class TestWriteAheadFlowFileRepository {
         // resource claims' counts should be updated for both the swapped out FlowFile and the non-swapped out FlowFile
         try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null))) {
             repo.initialize(claimManager);
-            repo.loadFlowFiles(queueProvider, -1L);
+            repo.loadFlowFiles(queueProvider);
 
             // Create a Repository Record that indicates that a FlowFile was created
             final FlowFileRecord flowFile1 = new StandardFlowFileRecord.Builder()
@@ -554,7 +554,7 @@ public class TestWriteAheadFlowFileRepository {
         final ResourceClaimManager recoveryClaimManager = new StandardResourceClaimManager();
         try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null))) {
             repo.initialize(recoveryClaimManager);
-            final long largestId = repo.loadFlowFiles(queueProvider, 0L);
+            final long largestId = repo.loadFlowFiles(queueProvider);
 
             // largest ID known is 1 because this doesn't take into account the FlowFiles that have been swapped out
             assertEquals(1, largestId);
@@ -587,7 +587,7 @@ public class TestWriteAheadFlowFileRepository {
         repo.initialize(new StandardResourceClaimManager());
 
         final TestQueueProvider queueProvider = new TestQueueProvider();
-        repo.loadFlowFiles(queueProvider, 0L);
+        repo.loadFlowFiles(queueProvider);
 
         final List<FlowFileRecord> flowFileCollection = new ArrayList<>();
 
@@ -639,7 +639,7 @@ public class TestWriteAheadFlowFileRepository {
         // restore
         final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null));
         repo2.initialize(new StandardResourceClaimManager());
-        repo2.loadFlowFiles(queueProvider, 0L);
+        repo2.loadFlowFiles(queueProvider);
 
         assertEquals(1, flowFileCollection.size());
         final FlowFileRecord flowFile = flowFileCollection.get(0);