You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/08/06 14:56:09 UTC
[nifi] branch main updated: NIFI-7706,
NIFI-5702: Allow NiFi to keep FlowFiles if their queue is unknown.
This way, if a Flow is inadvertently removed, updated, etc.,
and NiFi is restarted,
the data will not be dropped by default. The old mechanism of dropping data
is exposed via a property
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new eca7f15 NIFI-7706, NIFI-5702: Allow NiFi to keep FlowFiles if their queue is unknown. This way, if a Flow is inadvertently removed, updated, etc., and NiFi is restarted, the data will not be dropped by default. The old mechanism of dropping data is exposed via a property
eca7f15 is described below
commit eca7f153d0b23d39f14e5c65d1f96c7f663b2bbe
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Aug 5 15:47:05 2020 -0400
NIFI-7706, NIFI-5702: Allow NiFi to keep FlowFiles if their queue is unknown. This way, if a Flow is inadvertently removed, updated, etc., and NiFi is restarted, the data will not be dropped by default. The old mechanism of dropping data is exposed via a property
This closes #4454.
Signed-off-by: Bryan Bende <bb...@apache.org>
---
.../controller/repository/FlowFileRepository.java | 10 ++
.../apache/nifi/controller/flow/FlowManager.java | 5 +
.../org/apache/nifi/controller/FlowController.java | 2 +-
.../nifi/controller/flow/StandardFlowManager.java | 42 +++++++++
.../repository/WriteAheadFlowFileRepository.java | 105 ++++++++++++++++-----
.../bootstrap/tasks/ContentRepositoryScanTask.java | 13 +++
.../nifi-framework/nifi-resources/pom.xml | 1 +
.../src/main/resources/conf/nifi.properties | 1 +
nifi-system-tests/nifi-system-test-suite/pom.xml | 7 ++
.../nifi/tests/system/AggregateNiFiInstance.java | 17 +++-
.../apache/nifi/tests/system/NiFiClientUtil.java | 18 ++++
.../org/apache/nifi/tests/system/NiFiSystemIT.java | 8 +-
.../SpawnedStandaloneNiFiInstanceFactory.java | 4 +
.../system/restart/FlowFileRestorationIT.java | 102 ++++++++++++++++++++
.../resources/conf/clustered/node1/bootstrap.conf | 2 +-
.../resources/conf/clustered/node2/bootstrap.conf | 2 +-
.../cli/impl/client/nifi/ConnectionClient.java | 3 +
.../client/nifi/impl/JerseyConnectionClient.java | 24 +++++
18 files changed, 331 insertions(+), 35 deletions(-)
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index 0a7bad7..67165ee 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -23,6 +23,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -178,4 +179,13 @@ public interface FlowFileRepository extends Closeable {
throws IOException {
return null;
}
+
+ /**
+ * Returns the set of Resource Claims that are referenced by FlowFiles that have been "orphaned" because they belong to FlowFile Queues/Connections
+ * that did not exist in the flow when NiFi started
+ * @return the set of orphaned Resource Claims
+ */
+ default Set<ResourceClaim> findOrphanedResourceClaims() {
+ return Collections.emptySet();
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
index ba24705..c4503c4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
@@ -328,6 +328,11 @@ public interface FlowManager {
ParameterContextManager getParameterContextManager();
/**
+ * @return the number of each type of component (Processor, Controller Service, Process Group, Funnel, Input Port, Output Port, Reporting Task, Remote Process Group)
+ */
+ Map<String, Integer> getComponentCounts();
+
+ /**
* Purges all components from the flow, including:
*
* Process Groups (and all components within it)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 97b4c13..500cb15 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1438,7 +1438,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
flowSynchronized.set(true);
- LOG.info("Successfully synchronized controller with proposed flow");
+ LOG.info("Successfully synchronized controller with proposed flow. Flow contains the following number of components: {}", flowManager.getComponentCounts());
} finally {
writeLock.unlock("synchronize");
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index 4663d62..5c42de2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -87,6 +87,7 @@ import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -218,6 +219,10 @@ public class StandardFlowManager implements FlowManager {
}
public void setRootGroup(final ProcessGroup rootGroup) {
+ if (this.rootGroup != null && this.rootGroup.isEmpty()) {
+ allProcessGroups.remove(this.rootGroup.getIdentifier());
+ }
+
this.rootGroup = rootGroup;
allProcessGroups.put(ROOT_GROUP_ID_ALIAS, rootGroup);
allProcessGroups.put(rootGroup.getIdentifier(), rootGroup);
@@ -739,6 +744,43 @@ public class StandardFlowManager implements FlowManager {
}
@Override
+ public Map<String, Integer> getComponentCounts() {
+ final Map<String, Integer> componentCounts = new LinkedHashMap<>();
+ componentCounts.put("Processors", allProcessors.size());
+ componentCounts.put("Controller Services", getAllControllerServices().size());
+ componentCounts.put("Reporting Tasks", getAllReportingTasks().size());
+ componentCounts.put("Process Groups", allProcessGroups.size() - 2); // -2 to account for the root group because we don't want it in our counts and the 'root group alias' key.
+ componentCounts.put("Remote Process Groups", getRootGroup().findAllRemoteProcessGroups().size());
+
+ int localInputPorts = 0;
+ int publicInputPorts = 0;
+ for (final Port port : allInputPorts.values()) {
+ if (port instanceof PublicPort) {
+ publicInputPorts++;
+ } else {
+ localInputPorts++;
+ }
+ }
+
+ int localOutputPorts = 0;
+ int publicOutputPorts = 0;
+ for (final Port port : allOutputPorts.values()) {
+ if (port instanceof PublicPort) {
+ localOutputPorts++;
+ } else {
+ publicOutputPorts++;
+ }
+ }
+
+ componentCounts.put("Local Input Ports", localInputPorts);
+ componentCounts.put("Local Output Ports", localOutputPorts);
+ componentCounts.put("Public Input Ports", publicInputPorts);
+ componentCounts.put("Public Output Ports", publicOutputPorts);
+
+ return componentCounts;
+ }
+
+ @Override
public ParameterContext createParameterContext(final String id, final String name, final Map<String, Parameter> parameters) {
final boolean namingConflict = parameterContextManager.getParameterContexts().stream()
.anyMatch(paramContext -> paramContext.getName().equals(name));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 7f4991d..432e4e4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -16,6 +16,22 @@
*/
package org.apache.nifi.controller.repository;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
+import org.apache.nifi.wali.SnapshotCapture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.MinimalLockingWriteAheadLog;
+import org.wali.SyncListener;
+import org.wali.WriteAheadRepository;
+
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@@ -42,21 +58,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
-import org.apache.nifi.wali.SnapshotCapture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.MinimalLockingWriteAheadLog;
-import org.wali.SyncListener;
-import org.wali.WriteAheadRepository;
/**
* <p>
@@ -84,6 +85,7 @@ import org.wali.WriteAheadRepository;
public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncListener {
static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = "nifi.flowfile.repository.directory";
private static final String WRITE_AHEAD_LOG_IMPL = "nifi.flowfile.repository.wal.implementation";
+ private static final String RETAIN_ORPHANED_FLOWFILES = "nifi.flowfile.repository.retain.orphaned.flowfiles";
static final String SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.SequentialAccessWriteAheadLog";
static final String ENCRYPTED_SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog";
@@ -95,6 +97,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L);
private final boolean alwaysSync;
+ private final boolean retainOrphanedFlowFiles;
private static final Logger logger = LoggerFactory.getLogger(WriteAheadFlowFileRepository.class);
volatile ScheduledFuture<?> checkpointFuture;
@@ -105,6 +108,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
final ScheduledExecutorService checkpointExecutor;
private volatile Collection<SerializedRepositoryRecord> recoveredRecords = null;
+ private final Set<ResourceClaim> orphanedResourceClaims = Collections.synchronizedSet(new HashSet<>());
private final Set<String> swapLocationSuffixes = new HashSet<>(); // guarded by synchronizing on object itself
@@ -145,12 +149,16 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
checkpointExecutor = null;
walImplementation = null;
nifiProperties = null;
+ retainOrphanedFlowFiles = true;
}
public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC, "false"));
this.nifiProperties = nifiProperties;
+ final String orphanedFlowFileProperty = nifiProperties.getProperty(RETAIN_ORPHANED_FLOWFILES);
+ retainOrphanedFlowFiles = orphanedFlowFileProperty == null || Boolean.parseBoolean(orphanedFlowFileProperty);
+
// determine the database file path and ensure it exists
String writeAheadLogImpl = nifiProperties.getProperty(WRITE_AHEAD_LOG_IMPL);
if (writeAheadLogImpl == null) {
@@ -865,6 +873,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
queueMap.put(queue.getIdentifier(), queue);
}
+ final List<SerializedRepositoryRecord> dropRecords = new ArrayList<>();
int numFlowFilesMissingQueue = 0;
long maxId = 0;
for (final SerializedRepositoryRecord record : recordList) {
@@ -876,23 +885,52 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
final String queueId = record.getQueueIdentifier();
if (queueId == null) {
numFlowFilesMissingQueue++;
- logger.warn("Encounted Repository Record (id={}) with no Queue Identifier. Dropping this FlowFile", recordId);
+ logger.warn("Encountered Repository Record (id={}) with no Queue Identifier. Dropping this FlowFile", recordId);
+
+ // Add a drop record so that the record is not retained
+ dropRecords.add(new ReconstitutedSerializedRepositoryRecord.Builder()
+ .flowFileRecord(record.getFlowFileRecord())
+ .swapLocation(record.getSwapLocation())
+ .type(RepositoryRecordType.DELETE)
+ .build());
+
continue;
}
+ final ContentClaim claim = record.getContentClaim();
final FlowFileQueue flowFileQueue = queueMap.get(queueId);
- if (flowFileQueue == null) {
+ final boolean orphaned = flowFileQueue == null;
+ if (orphaned) {
numFlowFilesMissingQueue++;
- logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. Dropping this FlowFile", recordId, queueId);
- continue;
- }
- flowFileQueue.put(record.getFlowFileRecord());
+ if (isRetainOrphanedFlowFiles()) {
+ if (claim == null) {
+ logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. This FlowFile will not be restored to any "
+ + "FlowFile Queue in the flow. However, it will remain in the FlowFile Repository in case the flow containing this queue is later restored.", recordId, queueId);
+ } else {
+ claimManager.incrementClaimantCount(claim.getResourceClaim());
+ orphanedResourceClaims.add(claim.getResourceClaim());
+ logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. "
+ + "This FlowFile will not be restored to any FlowFile Queue in the flow. However, it will remain in the FlowFile Repository in "
+ + "case the flow containing this queue is later restored. This may result in the following Content Claim not being cleaned "
+ + "up by the Content Repository: {}", recordId, queueId, claim);
+ }
+ } else {
+ dropRecords.add(new ReconstitutedSerializedRepositoryRecord.Builder()
+ .flowFileRecord(record.getFlowFileRecord())
+ .swapLocation(record.getSwapLocation())
+ .type(RepositoryRecordType.DELETE)
+ .build());
+
+ logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. This FlowFile will be dropped.", recordId, queueId);
+ }
- final ContentClaim claim = record.getContentClaim();
- if (claim != null) {
+ continue;
+ } else if (claim != null) {
claimManager.incrementClaimantCount(claim.getResourceClaim());
}
+
+ flowFileQueue.put(record.getFlowFileRecord());
}
// If recoveredRecords has been populated it need to be nulled out now because it is no longer useful and can be garbage collected.
@@ -903,7 +941,17 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
flowFileSequenceGenerator.set(maxId + 1);
logger.info("Successfully restored {} FlowFiles and {} Swap Files", recordList.size() - numFlowFilesMissingQueue, recoveredSwapLocations.size());
if (numFlowFilesMissingQueue > 0) {
- logger.warn("On recovery, found {} FlowFiles whose queue no longer exists. These FlowFiles will be dropped.", numFlowFilesMissingQueue);
+ logger.warn("On recovery, found {} FlowFiles whose queues no longer exists.", numFlowFilesMissingQueue);
+ }
+
+ if (dropRecords.isEmpty()) {
+ logger.debug("No Drop Records to update Repository with");
+ } else {
+ final long updateStart = System.nanoTime();
+ wal.update(dropRecords, true);
+ final long updateEnd = System.nanoTime();
+ final long updateMillis = TimeUnit.MILLISECONDS.convert(updateEnd - updateStart, TimeUnit.NANOSECONDS);
+ logger.info("Successfully updated FlowFile Repository with {} Drop Records due to missing queues in {} milliseconds", dropRecords.size(), updateMillis);
}
final Runnable checkpointRunnable = new Runnable() {
@@ -927,6 +975,15 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
return maxId;
}
+ private boolean isRetainOrphanedFlowFiles() {
+ return retainOrphanedFlowFiles;
+ }
+
+ @Override
+ public Set<ResourceClaim> findOrphanedResourceClaims() {
+ return Collections.unmodifiableSet(orphanedResourceClaims);
+ }
+
@Override
public void updateMaxFlowFileIdentifier(final long maxId) {
while (true) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java
index 8493099..ca175df 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java
@@ -86,6 +86,19 @@ public class ContentRepositoryScanTask implements DiagnosticTask {
}
}
+ details.add(""); // Insert empty detail lines to make output more readable.
+
+ final Set<ResourceClaim> orphanedResourceClaims = flowFileRepository.findOrphanedResourceClaims();
+ if (orphanedResourceClaims == null || orphanedResourceClaims.isEmpty()) {
+ details.add("No Resource Claims were referenced by orphaned FlowFiles.");
+ } else {
+ details.add("The following Resource Claims were referenced by orphaned FlowFiles (FlowFiles that exist in the FlowFile Repository but for which the FlowFile's connection/queue" +
+ " did not exist when NiFi started):");
+
+ for (final ResourceClaim claim : orphanedResourceClaims) {
+ details.add(claim.toString());
+ }
+ }
return new StandardDiagnosticsDumpElement("Content Repository Scan", details);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index f1bf3e1..7084a45 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -62,6 +62,7 @@
<nifi.flowfile.repository.encryption.key.provider.location />
<nifi.flowfile.repository.encryption.key.id />
<nifi.flowfile.repository.encryption.key />
+ <nifi.flowfile.repository.retain.orphaned.flowfiles>true</nifi.flowfile.repository.retain.orphaned.flowfiles>
<nifi.swap.manager.implementation>org.apache.nifi.controller.FileSystemSwapManager</nifi.swap.manager.implementation>
<nifi.queue.swap.threshold>20000</nifi.queue.swap.threshold>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 9f2d544..5c0166e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -67,6 +67,7 @@ nifi.flowfile.repository.encryption.key.provider.implementation=${nifi.flowfile.
nifi.flowfile.repository.encryption.key.provider.location=${nifi.flowfile.repository.encryption.key.provider.location}
nifi.flowfile.repository.encryption.key.id=${nifi.flowfile.repository.encryption.key.id}
nifi.flowfile.repository.encryption.key=${nifi.flowfile.repository.encryption.key}
+nifi.flowfile.repository.retain.orphaned.flowfiles=${nifi.flowfile.repository.retain.orphaned.flowfiles}
nifi.swap.manager.implementation=${nifi.swap.manager.implementation}
nifi.queue.swap.threshold=${nifi.queue.swap.threshold}
diff --git a/nifi-system-tests/nifi-system-test-suite/pom.xml b/nifi-system-tests/nifi-system-test-suite/pom.xml
index 426f506..c186323 100644
--- a/nifi-system-tests/nifi-system-test-suite/pom.xml
+++ b/nifi-system-tests/nifi-system-test-suite/pom.xml
@@ -169,6 +169,12 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-server-nar</artifactId>
+ <version>1.12.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-provenance-repository-nar</artifactId>
<version>1.12.0-SNAPSHOT</version>
<type>nar</type>
@@ -192,6 +198,7 @@
<type>nar</type>
</dependency>
+
<!-- dependencies for jaxb/activation/annotation for running NiFi on Java 11 -->
<!-- TODO: remove these once minimum Java version is 11 -->
<dependency>
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/AggregateNiFiInstance.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/AggregateNiFiInstance.java
index 9358e86..b295bba 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/AggregateNiFiInstance.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/AggregateNiFiInstance.java
@@ -18,6 +18,7 @@ package org.apache.nifi.tests.system;
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -31,9 +32,23 @@ public class AggregateNiFiInstance implements NiFiInstance {
@Override
public void start(boolean waitForCompletion) {
+ final Map<Thread, NiFiInstance> startupThreads = new HashMap<>();
+
for (final NiFiInstance instance : instances) {
if (instance.isAutoStart()) {
- instance.start(waitForCompletion);
+ final Thread t = new Thread(() -> instance.start(waitForCompletion));
+ t.start();
+ startupThreads.put(t, instance);
+ }
+ }
+
+ for (final Map.Entry<Thread, NiFiInstance> entry : startupThreads.entrySet()) {
+ final Thread startupThread = entry.getKey();
+
+ try {
+ startupThread.join();
+ } catch (final InterruptedException ie) {
+ throw new RuntimeException("Interrupted while waiting for instance " + entry.getValue() + " to finish starting");
}
}
}
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 3fabdd7..5eaf8c6 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -82,6 +82,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -774,6 +775,23 @@ public class NiFiClientUtil {
return flowFileEntity;
}
+ public InputStream getFlowFileContent(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
+ final ListingRequestEntity listing = performQueueListing(connectionId);
+ final List<FlowFileSummaryDTO> flowFileSummaries = listing.getListingRequest().getFlowFileSummaries();
+ if (flowFileIndex >= flowFileSummaries.size()) {
+ throw new IllegalArgumentException("Cannot retrieve FlowFile with index " + flowFileIndex + " because queue only has " + flowFileSummaries.size() + " FlowFiles");
+ }
+
+ final FlowFileSummaryDTO flowFileSummary = flowFileSummaries.get(flowFileIndex);
+ final String uuid = flowFileSummary.getUuid();
+ final String nodeId = flowFileSummary.getClusterNodeId();
+
+ final FlowFileEntity flowFileEntity = nifiClient.getConnectionClient().getFlowFile(connectionId, uuid, nodeId);
+ flowFileEntity.getFlowFile().setClusterNodeId(nodeId);
+
+ return nifiClient.getConnectionClient().getFlowFileContent(connectionId, uuid, nodeId);
+ }
+
public VariableRegistryUpdateRequestEntity updateVariableRegistry(final ProcessGroupEntity processGroup, final Map<String, String> variables) throws NiFiClientException, IOException {
final Set<VariableEntity> variableEntities = new HashSet<>();
for (final Map.Entry<String, String> entry : variables.entrySet()) {
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 391a64d..c27f429 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.tests.system;
-import org.apache.nifi.processor.Relationship;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
@@ -60,15 +59,10 @@ public abstract class NiFiSystemIT {
private static final File LIB_DIR = new File("target/nifi-lib-assembly/lib");
private static volatile String nifiFrameworkVersion = null;
- protected static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("Convenience Relationship for use in tests")
- .build();
-
@Rule
public TestName name = new TestName();
@Rule
- public Timeout defaultTimeout = new Timeout(2, TimeUnit.MINUTES);
+ public Timeout defaultTimeout = new Timeout(5, TimeUnit.MINUTES);
private NiFiClient nifiClient;
private NiFiClientUtil clientUtil;
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
index c19a755..412dd0f 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
@@ -78,6 +78,10 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory
}
}
+ public String toString() {
+ return "RunNiFiInstance[dir=" + instanceDirectory + "]";
+ }
+
@Override
public void start(final boolean waitForCompletion) {
if (runNiFi != null) {
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java
new file mode 100644
index 0000000..c9003cb
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.restart;
+
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.tests.system.NiFiInstance;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class FlowFileRestorationIT extends NiFiSystemIT {
+
+ @Test
+ public void testDataInMissingQueueRestoredWhenQueueRestored() throws NiFiClientException, IOException, InterruptedException {
+ final ProcessorEntity generator = getClientUtil().createProcessor("GenerateFlowFile");
+ getClientUtil().updateProcessorProperties(generator, Collections.singletonMap("File Size", "1 KB"));
+ getClientUtil().updateProcessorSchedulingPeriod(generator, "100 min");
+
+ final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
+ final ConnectionEntity connection = getClientUtil().createConnection(generator, terminate, "success");
+
+ getNifiClient().getProcessorClient().startProcessor(generator);
+ waitForQueueCount(connection.getId(), 1);
+ getNifiClient().getProcessorClient().stopProcessor(generator);
+
+ final byte[] flowFileContents = getFlowFileContents(connection.getId(), 0);
+
+ assertEquals(1024, flowFileContents.length);
+
+ final NiFiInstance nifiInstance = getNiFiInstance();
+ nifiInstance.stop();
+
+ final File nifiHome = nifiInstance.getInstanceDirectory();
+ final File confDir = new File(nifiHome, "conf");
+ final File flowXmlGz = new File(confDir, "flow.xml.gz");
+ final byte[] flowXmlGzBytes = Files.readAllBytes(flowXmlGz.toPath());
+ assertTrue(flowXmlGz.delete());
+
+ nifiInstance.start();
+
+ try {
+ getNifiClient().getConnectionClient().getConnection(connection.getId());
+ Assert.fail("Didn't expect to retrieve a connection");
+ } catch (final NiFiClientException nfce) {
+ // Expected because the connection no longer exists.
+ }
+
+ // Stop the instance, restore the flow.xml.gz, and restart
+ nifiInstance.stop();
+ Files.write(flowXmlGz.toPath(), flowXmlGzBytes, StandardOpenOption.CREATE);
+ nifiInstance.start();
+
+ // Ensure that there's a FlowFile queued up and that its contents are still accessible and have not changed.
+ final ConnectionEntity retrievedConnection = getNifiClient().getConnectionClient().getConnection(connection.getId());
+ assertNotNull(retrievedConnection);
+ waitForQueueCount(connection.getId(), 1);
+ final byte[] contentsAfterRestart = getFlowFileContents(connection.getId(), 0);
+
+ assertArrayEquals(flowFileContents, contentsAfterRestart);
+ }
+
+ private byte[] getFlowFileContents(final String connectionId, final int flowFileIndex) throws IOException, NiFiClientException {
+ final byte[] flowFileContents;
+ try (final InputStream in = getClientUtil().getFlowFileContent(connectionId, flowFileIndex);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+
+ StreamUtils.copy(in, baos);
+ return baos.toByteArray();
+ }
+ }
+}
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/bootstrap.conf b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/bootstrap.conf
index 297108b..9c00c7c 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/bootstrap.conf
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/bootstrap.conf
@@ -24,7 +24,7 @@ graceful.shutdown.seconds=20
# JVM memory settings
java.arg.2= -Xms128m
-java.arg.3=-Xmx128m
+java.arg.3=-Xmx256m
java.arg.14=-Djava.awt.headless=true
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf
index 0f4b8f3..66c7e1f 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf
@@ -24,7 +24,7 @@ graceful.shutdown.seconds=20
# JVM memory settings
java.arg.2= -Xms128m
-java.arg.3=-Xmx128m
+java.arg.3=-Xmx256m
java.arg.14=-Djava.awt.headless=true
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
index f5c07d1..ddd337d 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
@@ -22,6 +22,7 @@ import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ListingRequestEntity;
import java.io.IOException;
+import java.io.InputStream;
public interface ConnectionClient {
ConnectionEntity getConnection(String id) throws NiFiClientException, IOException;
@@ -49,4 +50,6 @@ public interface ConnectionClient {
FlowFileEntity getFlowFile(String connectionId, String flowFileUuid) throws NiFiClientException, IOException;
FlowFileEntity getFlowFile(String connectionId, String flowFileUuid, String nodeId) throws NiFiClientException, IOException;
+
+ InputStream getFlowFileContent(String connectionId, String flowFileUuid, String nodeId) throws NiFiClientException, IOException;
}
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
index ffe038e..34e2953 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
@@ -28,6 +28,7 @@ import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
@@ -271,4 +272,27 @@ public class JerseyConnectionClient extends AbstractJerseyClient implements Conn
return getRequestBuilder(target).get(FlowFileEntity.class);
});
}
+
+ @Override
+ public InputStream getFlowFileContent(final String connectionId, final String flowFileUuid, final String nodeId) throws NiFiClientException, IOException {
+ if (connectionId == null) {
+ throw new IllegalArgumentException("Connection ID cannot be null");
+ }
+ if (flowFileUuid == null) {
+ throw new IllegalArgumentException("FlowFile UUID cannot be null");
+ }
+
+ return executeAction("Error retrieving FlowFile Content", () -> {
+ WebTarget target = flowFileQueueTarget
+ .path("flowfiles/{uuid}/content")
+ .resolveTemplate("id", connectionId)
+ .resolveTemplate("uuid", flowFileUuid);
+
+ if (nodeId != null) {
+ target = target.queryParam("clusterNodeId", nodeId);
+ }
+
+ return getRequestBuilder(target).get(InputStream.class);
+ });
+ }
}