You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/08/06 14:56:09 UTC

[nifi] branch main updated: NIFI-7706, NIFI-5702: Allow NiFi to keep FlowFiles if their queue is unknown. This way, if a Flow is inadvertently removed, updated, etc., and NiFi is restarted, the data will not be dropped by default. The old mechanism of dropping data is exposed via a property

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new eca7f15  NIFI-7706, NIFI-5702: Allow NiFi to keep FlowFiles if their queue is unknown. This way, if a Flow is inadvertently removed, updated, etc., and NiFi is restarted, the data will not be dropped by default. The old mechanism of dropping data is exposed via a property
eca7f15 is described below

commit eca7f153d0b23d39f14e5c65d1f96c7f663b2bbe
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Aug 5 15:47:05 2020 -0400

    NIFI-7706, NIFI-5702: Allow NiFi to keep FlowFiles if their queue is unknown. This way, if a Flow is inadvertently removed, updated, etc., and NiFi is restarted, the data will not be dropped by default. The old mechanism of dropping data is exposed via a property
    
    This closes #4454.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../controller/repository/FlowFileRepository.java  |  10 ++
 .../apache/nifi/controller/flow/FlowManager.java   |   5 +
 .../org/apache/nifi/controller/FlowController.java |   2 +-
 .../nifi/controller/flow/StandardFlowManager.java  |  42 +++++++++
 .../repository/WriteAheadFlowFileRepository.java   | 105 ++++++++++++++++-----
 .../bootstrap/tasks/ContentRepositoryScanTask.java |  13 +++
 .../nifi-framework/nifi-resources/pom.xml          |   1 +
 .../src/main/resources/conf/nifi.properties        |   1 +
 nifi-system-tests/nifi-system-test-suite/pom.xml   |   7 ++
 .../nifi/tests/system/AggregateNiFiInstance.java   |  17 +++-
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  18 ++++
 .../org/apache/nifi/tests/system/NiFiSystemIT.java |   8 +-
 .../SpawnedStandaloneNiFiInstanceFactory.java      |   4 +
 .../system/restart/FlowFileRestorationIT.java      | 102 ++++++++++++++++++++
 .../resources/conf/clustered/node1/bootstrap.conf  |   2 +-
 .../resources/conf/clustered/node2/bootstrap.conf  |   2 +-
 .../cli/impl/client/nifi/ConnectionClient.java     |   3 +
 .../client/nifi/impl/JerseyConnectionClient.java   |  24 +++++
 18 files changed, 331 insertions(+), 35 deletions(-)

diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index 0a7bad7..67165ee 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -23,6 +23,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -178,4 +179,13 @@ public interface FlowFileRepository extends Closeable {
         throws IOException {
         return null;
     }
+
+    /**
+     * Returns the set of Resource Claims that are referenced by FlowFiles that have been "orphaned" because they belong to FlowFile Queues/Connections
+     * that did not exist in the flow when NiFi started
+     * @return the set of orphaned Resource Claims
+     */
+    default Set<ResourceClaim> findOrphanedResourceClaims() {
+        return Collections.emptySet();
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
index ba24705..c4503c4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
@@ -328,6 +328,11 @@ public interface FlowManager {
     ParameterContextManager getParameterContextManager();
 
     /**
+     * @return the number of each type of component (Processor, Controller Service, Process Group, Funnel, Input Port, Output Port, Reporting Task, Remote Process Group)
+     */
+    Map<String, Integer> getComponentCounts();
+
+    /**
      * Purges all components from the flow, including:
      *
      * Process Groups (and all components within it)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 97b4c13..500cb15 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1438,7 +1438,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             }
 
             flowSynchronized.set(true);
-            LOG.info("Successfully synchronized controller with proposed flow");
+            LOG.info("Successfully synchronized controller with proposed flow. Flow contains the following number of components: {}", flowManager.getComponentCounts());
         } finally {
             writeLock.unlock("synchronize");
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index 4663d62..5c42de2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -87,6 +87,7 @@ import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -218,6 +219,10 @@ public class StandardFlowManager implements FlowManager {
     }
 
     public void setRootGroup(final ProcessGroup rootGroup) {
+        if (this.rootGroup != null && this.rootGroup.isEmpty()) {
+            allProcessGroups.remove(this.rootGroup.getIdentifier());
+        }
+
         this.rootGroup = rootGroup;
         allProcessGroups.put(ROOT_GROUP_ID_ALIAS, rootGroup);
         allProcessGroups.put(rootGroup.getIdentifier(), rootGroup);
@@ -739,6 +744,43 @@ public class StandardFlowManager implements FlowManager {
     }
 
     @Override
+    public Map<String, Integer> getComponentCounts() {
+        final Map<String, Integer> componentCounts = new LinkedHashMap<>();
+        componentCounts.put("Processors", allProcessors.size());
+        componentCounts.put("Controller Services", getAllControllerServices().size());
+        componentCounts.put("Reporting Tasks", getAllReportingTasks().size());
+        componentCounts.put("Process Groups", allProcessGroups.size() - 2); // -2 to account for the root group because we don't want it in our counts and the 'root group alias' key.
+        componentCounts.put("Remote Process Groups", getRootGroup().findAllRemoteProcessGroups().size());
+
+        int localInputPorts = 0;
+        int publicInputPorts = 0;
+        for (final Port port : allInputPorts.values()) {
+            if (port instanceof PublicPort) {
+                publicInputPorts++;
+            } else {
+                localInputPorts++;
+            }
+        }
+
+        int localOutputPorts = 0;
+        int publicOutputPorts = 0;
+        for (final Port port : allOutputPorts.values()) {
+            if (port instanceof PublicPort) {
+                localOutputPorts++;
+            } else {
+                publicOutputPorts++;
+            }
+        }
+
+        componentCounts.put("Local Input Ports", localInputPorts);
+        componentCounts.put("Local Output Ports", localOutputPorts);
+        componentCounts.put("Public Input Ports", publicInputPorts);
+        componentCounts.put("Public Output Ports", publicOutputPorts);
+
+        return componentCounts;
+    }
+
+    @Override
     public ParameterContext createParameterContext(final String id, final String name, final Map<String, Parameter> parameters) {
         final boolean namingConflict = parameterContextManager.getParameterContexts().stream()
             .anyMatch(paramContext -> paramContext.getName().equals(name));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 7f4991d..432e4e4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -16,6 +16,22 @@
  */
 package org.apache.nifi.controller.repository;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
+import org.apache.nifi.wali.SnapshotCapture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.MinimalLockingWriteAheadLog;
+import org.wali.SyncListener;
+import org.wali.WriteAheadRepository;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -42,21 +58,6 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
-import org.apache.nifi.wali.SnapshotCapture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.MinimalLockingWriteAheadLog;
-import org.wali.SyncListener;
-import org.wali.WriteAheadRepository;
 
 /**
  * <p>
@@ -84,6 +85,7 @@ import org.wali.WriteAheadRepository;
 public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncListener {
     static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = "nifi.flowfile.repository.directory";
     private static final String WRITE_AHEAD_LOG_IMPL = "nifi.flowfile.repository.wal.implementation";
+    private static final String RETAIN_ORPHANED_FLOWFILES = "nifi.flowfile.repository.retain.orphaned.flowfiles";
 
     static final String SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.SequentialAccessWriteAheadLog";
     static final String ENCRYPTED_SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog";
@@ -95,6 +97,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
 
     final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L);
     private final boolean alwaysSync;
+    private final boolean retainOrphanedFlowFiles;
 
     private static final Logger logger = LoggerFactory.getLogger(WriteAheadFlowFileRepository.class);
     volatile ScheduledFuture<?> checkpointFuture;
@@ -105,6 +108,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
     final ScheduledExecutorService checkpointExecutor;
 
     private volatile Collection<SerializedRepositoryRecord> recoveredRecords = null;
+    private final Set<ResourceClaim> orphanedResourceClaims = Collections.synchronizedSet(new HashSet<>());
 
     private final Set<String> swapLocationSuffixes = new HashSet<>(); // guarded by synchronizing on object itself
 
@@ -145,12 +149,16 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         checkpointExecutor = null;
         walImplementation = null;
         nifiProperties = null;
+        retainOrphanedFlowFiles = true;
     }
 
     public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
         alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC, "false"));
         this.nifiProperties = nifiProperties;
 
+        final String orphanedFlowFileProperty = nifiProperties.getProperty(RETAIN_ORPHANED_FLOWFILES);
+        retainOrphanedFlowFiles = orphanedFlowFileProperty == null || Boolean.parseBoolean(orphanedFlowFileProperty);
+
         // determine the database file path and ensure it exists
         String writeAheadLogImpl = nifiProperties.getProperty(WRITE_AHEAD_LOG_IMPL);
         if (writeAheadLogImpl == null) {
@@ -865,6 +873,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
             queueMap.put(queue.getIdentifier(), queue);
         }
 
+        final List<SerializedRepositoryRecord> dropRecords = new ArrayList<>();
         int numFlowFilesMissingQueue = 0;
         long maxId = 0;
         for (final SerializedRepositoryRecord record : recordList) {
@@ -876,23 +885,52 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
             final String queueId = record.getQueueIdentifier();
             if (queueId == null) {
                 numFlowFilesMissingQueue++;
-                logger.warn("Encounted Repository Record (id={}) with no Queue Identifier. Dropping this FlowFile", recordId);
+                logger.warn("Encountered Repository Record (id={}) with no Queue Identifier. Dropping this FlowFile", recordId);
+
+                // Add a drop record so that the record is not retained
+                dropRecords.add(new ReconstitutedSerializedRepositoryRecord.Builder()
+                    .flowFileRecord(record.getFlowFileRecord())
+                    .swapLocation(record.getSwapLocation())
+                    .type(RepositoryRecordType.DELETE)
+                    .build());
+
                 continue;
             }
 
+            final ContentClaim claim = record.getContentClaim();
             final FlowFileQueue flowFileQueue = queueMap.get(queueId);
-            if (flowFileQueue == null) {
+            final boolean orphaned = flowFileQueue == null;
+            if (orphaned) {
                 numFlowFilesMissingQueue++;
-                logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. Dropping this FlowFile", recordId, queueId);
-                continue;
-            }
 
-            flowFileQueue.put(record.getFlowFileRecord());
+                if (isRetainOrphanedFlowFiles()) {
+                    if (claim == null) {
+                        logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. This FlowFile will not be restored to any "
+                            + "FlowFile Queue in the flow. However, it will remain in the FlowFile Repository in case the flow containing this queue is later restored.", recordId, queueId);
+                    } else {
+                        claimManager.incrementClaimantCount(claim.getResourceClaim());
+                        orphanedResourceClaims.add(claim.getResourceClaim());
+                        logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. "
+                                + "This FlowFile will not be restored to any FlowFile Queue in the flow. However, it will remain in the FlowFile Repository in "
+                                + "case the flow containing this queue is later restored. This may result in the following Content Claim not being cleaned "
+                                + "up by the Content Repository: {}", recordId, queueId, claim);
+                    }
+                } else {
+                    dropRecords.add(new ReconstitutedSerializedRepositoryRecord.Builder()
+                        .flowFileRecord(record.getFlowFileRecord())
+                        .swapLocation(record.getSwapLocation())
+                        .type(RepositoryRecordType.DELETE)
+                        .build());
+
+                    logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. This FlowFile will be dropped.", recordId, queueId);
+                }
 
-            final ContentClaim claim = record.getContentClaim();
-            if (claim != null) {
+                continue;
+            } else if (claim != null) {
                 claimManager.incrementClaimantCount(claim.getResourceClaim());
             }
+
+            flowFileQueue.put(record.getFlowFileRecord());
         }
 
         // If recoveredRecords has been populated it need to be nulled out now because it is no longer useful and can be garbage collected.
@@ -903,7 +941,17 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         flowFileSequenceGenerator.set(maxId + 1);
         logger.info("Successfully restored {} FlowFiles and {} Swap Files", recordList.size() - numFlowFilesMissingQueue, recoveredSwapLocations.size());
         if (numFlowFilesMissingQueue > 0) {
-            logger.warn("On recovery, found {} FlowFiles whose queue no longer exists. These FlowFiles will be dropped.", numFlowFilesMissingQueue);
+            logger.warn("On recovery, found {} FlowFiles whose queues no longer exists.", numFlowFilesMissingQueue);
+        }
+
+        if (dropRecords.isEmpty()) {
+            logger.debug("No Drop Records to update Repository with");
+        } else {
+            final long updateStart = System.nanoTime();
+            wal.update(dropRecords, true);
+            final long updateEnd = System.nanoTime();
+            final long updateMillis = TimeUnit.MILLISECONDS.convert(updateEnd - updateStart, TimeUnit.NANOSECONDS);
+            logger.info("Successfully updated FlowFile Repository with {} Drop Records due to missing queues in {} milliseconds", dropRecords.size(), updateMillis);
         }
 
         final Runnable checkpointRunnable = new Runnable() {
@@ -927,6 +975,15 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         return maxId;
     }
 
+    private boolean isRetainOrphanedFlowFiles() {
+        return retainOrphanedFlowFiles;
+    }
+
+    @Override
+    public Set<ResourceClaim> findOrphanedResourceClaims() {
+        return Collections.unmodifiableSet(orphanedResourceClaims);
+    }
+
     @Override
     public void updateMaxFlowFileIdentifier(final long maxId) {
         while (true) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java
index 8493099..ca175df 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java
@@ -86,6 +86,19 @@ public class ContentRepositoryScanTask implements DiagnosticTask {
             }
         }
 
+        details.add(""); // Insert empty detail lines to make output more readable.
+
+        final Set<ResourceClaim> orphanedResourceClaims = flowFileRepository.findOrphanedResourceClaims();
+        if (orphanedResourceClaims == null || orphanedResourceClaims.isEmpty()) {
+            details.add("No Resource Claims were referenced by orphaned FlowFiles.");
+        } else {
+            details.add("The following Resource Claims were referenced by orphaned FlowFiles (FlowFiles that exist in the FlowFile Repository but for which the FlowFile's connection/queue" +
+                " did not exist when NiFi started):");
+
+            for (final ResourceClaim claim : orphanedResourceClaims) {
+                details.add(claim.toString());
+            }
+        }
 
         return new StandardDiagnosticsDumpElement("Content Repository Scan", details);
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index f1bf3e1..7084a45 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -62,6 +62,7 @@
         <nifi.flowfile.repository.encryption.key.provider.location />
         <nifi.flowfile.repository.encryption.key.id />
         <nifi.flowfile.repository.encryption.key />
+        <nifi.flowfile.repository.retain.orphaned.flowfiles>true</nifi.flowfile.repository.retain.orphaned.flowfiles>
         <nifi.swap.manager.implementation>org.apache.nifi.controller.FileSystemSwapManager</nifi.swap.manager.implementation>
         <nifi.queue.swap.threshold>20000</nifi.queue.swap.threshold>
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 9f2d544..5c0166e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -67,6 +67,7 @@ nifi.flowfile.repository.encryption.key.provider.implementation=${nifi.flowfile.
 nifi.flowfile.repository.encryption.key.provider.location=${nifi.flowfile.repository.encryption.key.provider.location}
 nifi.flowfile.repository.encryption.key.id=${nifi.flowfile.repository.encryption.key.id}
 nifi.flowfile.repository.encryption.key=${nifi.flowfile.repository.encryption.key}
+nifi.flowfile.repository.retain.orphaned.flowfiles=${nifi.flowfile.repository.retain.orphaned.flowfiles}
 
 nifi.swap.manager.implementation=${nifi.swap.manager.implementation}
 nifi.queue.swap.threshold=${nifi.queue.swap.threshold}
diff --git a/nifi-system-tests/nifi-system-test-suite/pom.xml b/nifi-system-tests/nifi-system-test-suite/pom.xml
index 426f506..c186323 100644
--- a/nifi-system-tests/nifi-system-test-suite/pom.xml
+++ b/nifi-system-tests/nifi-system-test-suite/pom.xml
@@ -169,6 +169,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-server-nar</artifactId>
+            <version>1.12.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-provenance-repository-nar</artifactId>
             <version>1.12.0-SNAPSHOT</version>
             <type>nar</type>
@@ -192,6 +198,7 @@
             <type>nar</type>
         </dependency>
 
+
         <!-- dependencies for jaxb/activation/annotation for running NiFi on Java 11 -->
         <!-- TODO: remove these once minimum Java version is 11 -->
         <dependency>
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/AggregateNiFiInstance.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/AggregateNiFiInstance.java
index 9358e86..b295bba 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/AggregateNiFiInstance.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/AggregateNiFiInstance.java
@@ -18,6 +18,7 @@ package org.apache.nifi.tests.system;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -31,9 +32,23 @@ public class AggregateNiFiInstance implements NiFiInstance {
 
     @Override
     public void start(boolean waitForCompletion) {
+        final Map<Thread, NiFiInstance> startupThreads = new HashMap<>();
+
         for (final NiFiInstance instance : instances) {
             if (instance.isAutoStart()) {
-                instance.start(waitForCompletion);
+                final Thread t = new Thread(() -> instance.start(waitForCompletion));
+                t.start();
+                startupThreads.put(t, instance);
+            }
+        }
+
+        for (final Map.Entry<Thread, NiFiInstance> entry : startupThreads.entrySet()) {
+            final Thread startupThread = entry.getKey();
+
+            try {
+                startupThread.join();
+            } catch (final InterruptedException ie) {
+                throw new RuntimeException("Interrupted while waiting for instance " + entry.getValue() + " to finish starting");
             }
         }
     }
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 3fabdd7..5eaf8c6 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -82,6 +82,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -774,6 +775,23 @@ public class NiFiClientUtil {
         return flowFileEntity;
     }
 
+    public InputStream getFlowFileContent(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
+        final ListingRequestEntity listing = performQueueListing(connectionId);
+        final List<FlowFileSummaryDTO> flowFileSummaries = listing.getListingRequest().getFlowFileSummaries();
+        if (flowFileIndex >= flowFileSummaries.size()) {
+            throw new IllegalArgumentException("Cannot retrieve FlowFile with index " + flowFileIndex + " because queue only has " + flowFileSummaries.size() + " FlowFiles");
+        }
+
+        final FlowFileSummaryDTO flowFileSummary = flowFileSummaries.get(flowFileIndex);
+        final String uuid = flowFileSummary.getUuid();
+        final String nodeId = flowFileSummary.getClusterNodeId();
+
+        final FlowFileEntity flowFileEntity = nifiClient.getConnectionClient().getFlowFile(connectionId, uuid, nodeId);
+        flowFileEntity.getFlowFile().setClusterNodeId(nodeId);
+
+        return nifiClient.getConnectionClient().getFlowFileContent(connectionId, uuid, nodeId);
+    }
+
     public VariableRegistryUpdateRequestEntity updateVariableRegistry(final ProcessGroupEntity processGroup, final Map<String, String> variables) throws NiFiClientException, IOException {
         final Set<VariableEntity> variableEntities = new HashSet<>();
         for (final Map.Entry<String, String> entry : variables.entrySet()) {
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 391a64d..c27f429 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.tests.system;
 
-import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
@@ -60,15 +59,10 @@ public abstract class NiFiSystemIT {
     private static final File LIB_DIR = new File("target/nifi-lib-assembly/lib");
     private static volatile String nifiFrameworkVersion = null;
 
-    protected static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("Convenience Relationship for use in tests")
-        .build();
-
     @Rule
     public TestName name = new TestName();
     @Rule
-    public Timeout defaultTimeout = new Timeout(2, TimeUnit.MINUTES);
+    public Timeout defaultTimeout = new Timeout(5, TimeUnit.MINUTES);
 
     private NiFiClient nifiClient;
     private NiFiClientUtil clientUtil;
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
index c19a755..412dd0f 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
@@ -78,6 +78,10 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory
             }
         }
 
+        public String toString() {
+            return "RunNiFiInstance[dir=" + instanceDirectory + "]";
+        }
+
         @Override
         public void start(final boolean waitForCompletion) {
             if (runNiFi != null) {
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java
new file mode 100644
index 0000000..c9003cb
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.restart;
+
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.tests.system.NiFiInstance;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class FlowFileRestorationIT extends NiFiSystemIT {
+
+    @Test
+    public void testDataInMissingQueueRestoredWhenQueueRestored() throws NiFiClientException, IOException, InterruptedException {
+        final ProcessorEntity generator = getClientUtil().createProcessor("GenerateFlowFile");
+        getClientUtil().updateProcessorProperties(generator, Collections.singletonMap("File Size", "1 KB"));
+        getClientUtil().updateProcessorSchedulingPeriod(generator, "100 min");
+
+        final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
+        final ConnectionEntity connection = getClientUtil().createConnection(generator, terminate, "success");
+
+        getNifiClient().getProcessorClient().startProcessor(generator);
+        waitForQueueCount(connection.getId(), 1);
+        getNifiClient().getProcessorClient().stopProcessor(generator);
+
+        final byte[] flowFileContents = getFlowFileContents(connection.getId(), 0);
+
+        assertEquals(1024, flowFileContents.length);
+
+        final NiFiInstance nifiInstance = getNiFiInstance();
+        nifiInstance.stop();
+
+        final File nifiHome = nifiInstance.getInstanceDirectory();
+        final File confDir = new File(nifiHome, "conf");
+        final File flowXmlGz = new File(confDir, "flow.xml.gz");
+        final byte[] flowXmlGzBytes = Files.readAllBytes(flowXmlGz.toPath());
+        assertTrue(flowXmlGz.delete());
+
+        nifiInstance.start();
+
+        try {
+            getNifiClient().getConnectionClient().getConnection(connection.getId());
+            Assert.fail("Didn't expect to retrieve a connection");
+        } catch (final NiFiClientException nfce) {
+            // Expected because the connection no longer exists.
+        }
+
+        // Stop the instance, restore the flow.xml.gz, and restart
+        nifiInstance.stop();
+        Files.write(flowXmlGz.toPath(), flowXmlGzBytes, StandardOpenOption.CREATE);
+        nifiInstance.start();
+
+        // Ensure that there's a FlowFile queued up and that its contents are still accessible and have not changed.
+        final ConnectionEntity retrievedConnection = getNifiClient().getConnectionClient().getConnection(connection.getId());
+        assertNotNull(retrievedConnection);
+        waitForQueueCount(connection.getId(), 1);
+        final byte[] contentsAfterRestart = getFlowFileContents(connection.getId(), 0);
+
+        assertArrayEquals(flowFileContents, contentsAfterRestart);
+    }
+
+    private byte[] getFlowFileContents(final String connectionId, final int flowFileIndex) throws IOException, NiFiClientException {
+        final byte[] flowFileContents;
+        try (final InputStream in = getClientUtil().getFlowFileContent(connectionId, flowFileIndex);
+             final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+
+            StreamUtils.copy(in, baos);
+            return baos.toByteArray();
+        }
+    }
+}
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/bootstrap.conf b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/bootstrap.conf
index 297108b..9c00c7c 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/bootstrap.conf
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/bootstrap.conf
@@ -24,7 +24,7 @@ graceful.shutdown.seconds=20
 
 # JVM memory settings
 java.arg.2= -Xms128m
-java.arg.3=-Xmx128m
+java.arg.3=-Xmx256m
 
 java.arg.14=-Djava.awt.headless=true
 
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf
index 0f4b8f3..66c7e1f 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf
@@ -24,7 +24,7 @@ graceful.shutdown.seconds=20
 
 # JVM memory settings
 java.arg.2= -Xms128m
-java.arg.3=-Xmx128m
+java.arg.3=-Xmx256m
 
 java.arg.14=-Djava.awt.headless=true
 
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
index f5c07d1..ddd337d 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
@@ -22,6 +22,7 @@ import org.apache.nifi.web.api.entity.FlowFileEntity;
 import org.apache.nifi.web.api.entity.ListingRequestEntity;
 
 import java.io.IOException;
+import java.io.InputStream;
 
 public interface ConnectionClient {
     ConnectionEntity getConnection(String id) throws NiFiClientException, IOException;
@@ -49,4 +50,6 @@ public interface ConnectionClient {
     FlowFileEntity getFlowFile(String connectionId, String flowFileUuid) throws NiFiClientException, IOException;
 
     FlowFileEntity getFlowFile(String connectionId, String flowFileUuid, String nodeId) throws NiFiClientException, IOException;
+
+    InputStream getFlowFileContent(String connectionId, String flowFileUuid, String nodeId) throws NiFiClientException, IOException;
 }
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
index ffe038e..34e2953 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
@@ -28,6 +28,7 @@ import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Collections;
 import java.util.Map;
 
@@ -271,4 +272,27 @@ public class JerseyConnectionClient extends AbstractJerseyClient implements Conn
             return getRequestBuilder(target).get(FlowFileEntity.class);
         });
     }
+
+    @Override
+    public InputStream getFlowFileContent(final String connectionId, final String flowFileUuid, final String nodeId) throws NiFiClientException, IOException {
+        if (connectionId == null) {
+            throw new IllegalArgumentException("Connection ID cannot be null");
+        }
+        if (flowFileUuid == null) {
+            throw new IllegalArgumentException("FlowFile UUID cannot be null");
+        }
+
+        return executeAction("Error retrieving FlowFile Content", () -> {
+            WebTarget target = flowFileQueueTarget
+                .path("flowfiles/{uuid}/content")
+                .resolveTemplate("id", connectionId)
+                .resolveTemplate("uuid", flowFileUuid);
+
+            if (nodeId != null) {
+                target = target.queryParam("clusterNodeId", nodeId);
+            }
+
+            return getRequestBuilder(target).get(InputStream.class);
+        });
+    }
 }