You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2023/02/22 20:38:32 UTC
[nifi] branch main updated: NIFI-11192: Ensure that if ports moved between parent/child group in between flow versions that we can properly handle that. Added system tests to verify.
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 87e61c50ee NIFI-11192: Ensure that if ports moved between parent/child group in between flow versions that we can properly handle that. Added system tests to verify.
87e61c50ee is described below
commit 87e61c50eefeef46e6fc5e0080a70d2605a2799b
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Feb 21 11:28:54 2023 -0500
NIFI-11192: Ensure that if ports moved between parent/child group in between flow versions that we can properly handle that. Added system tests to verify.
NIFI-11192: If a failure is encountered when changing the version of a flow from 1 version to another, attempt to rollback the changes instead of just failing with the flow in a bad state
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #6981
---
.../StandardVersionedComponentSynchronizer.java | 41 ++-
.../apache/nifi/groups/StandardProcessGroup.java | 3 +-
.../apache/nifi/util/FlowDifferenceFilters.java | 38 +++
.../apache/nifi/web/api/FlowUpdateResource.java | 121 +++++---
.../registry/FileSystemFlowRegistryClient.java | 47 ++++
.../apache/nifi/tests/system/NiFiClientUtil.java | 70 +++++
.../system/registry/ClusteredRegistryClientIT.java | 29 ++
.../tests/system/registry/RegistryClientIT.java | 84 +++++-
.../src/test/resources/conf/default/bootstrap.conf | 2 +-
.../src/test/resources/conf/default/logback.xml | 1 +
.../test/resources/conf/default/nifi.properties | 2 +-
.../flow-with-invalid-connection/1/snapshot.json | 305 +++++++++++++++++++++
.../flow-with-invalid-connection/2/snapshot.json | 156 +++++++++++
.../test-flows/port-moved-groups/1/snapshot.json | 305 +++++++++++++++++++++
.../test-flows/port-moved-groups/2/snapshot.json | 1 +
.../cli/impl/client/nifi/ProcessGroupClient.java | 2 +
.../client/nifi/impl/JerseyProcessGroupClient.java | 16 ++
17 files changed, 1178 insertions(+), 45 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index f7e64336ee..5659567149 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -683,18 +683,17 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
// If the Connection's destination didn't change, nothing to do
final String destinationVersionId = connection.getDestination().getVersionedComponentId().orElse(null);
final String proposedDestinationId = proposedConnection.getDestination().getId();
- if (Objects.equals(destinationVersionId, proposedDestinationId)) {
+ final String destinationGroupVersionId = connection.getDestination().getProcessGroup().getVersionedComponentId().orElse(null);
+ final String proposedDestinationGroupId = proposedConnection.getDestination().getGroupId();
+ if (Objects.equals(destinationVersionId, proposedDestinationId) && Objects.equals(destinationGroupVersionId, proposedDestinationGroupId)) {
continue;
}
// Find the destination of the connection. If the destination doesn't yet exist (because it's part of the proposed Process Group but not yet added),
// we will set the destination to a temporary destination. Then, after adding components, we will update the destinations again.
Connectable newDestination = getConnectable(group, proposedConnection.getDestination());
- if (
- newDestination == null
- ||
- (newDestination.getConnectableType() == ConnectableType.OUTPUT_PORT && !newDestination.getProcessGroup().equals(connection.getProcessGroup()))
- ) {
+ final boolean useTempDestination = isTempDestinationNecessary(connection, proposedConnection, newDestination);
+ if (useTempDestination) {
final Funnel temporaryDestination = getTemporaryFunnel(connection.getProcessGroup());
LOG.debug("Updated Connection {} to have a temporary destination of {}", connection, temporaryDestination);
newDestination = temporaryDestination;
@@ -707,6 +706,36 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
return connectionsWithTempDestination;
}
+ private boolean isTempDestinationNecessary(final Connection existingConnection, final VersionedConnection proposedConnection, final Connectable newDestination) {
+ if (newDestination == null) {
+ return true;
+ }
+
+ // If the destination is an Input Port or an Output Port and the group changed, use a temp destination
+ final ConnectableType connectableType = newDestination.getConnectableType();
+ final boolean port = connectableType == ConnectableType.OUTPUT_PORT || connectableType == ConnectableType.INPUT_PORT;
+ final boolean groupChanged = !newDestination.getProcessGroup().equals(existingConnection.getProcessGroup());
+ if (port && groupChanged) {
+ return true;
+ }
+
+ // If the proposed destination has a different group than the existing group, use a temp destination.
+ final String proposedDestinationGroupId = proposedConnection.getDestination().getGroupId();
+ final String destinationGroupVersionedComponentId = existingConnection.getDestination().getProcessGroup().getVersionedComponentId().orElse(null);
+ if (!Objects.equals(proposedDestinationGroupId, destinationGroupVersionedComponentId)) {
+ return true;
+ }
+
+ // If the proposed connection exists in a different group than the existing group, use a temp destination.
+ final String connectionGroupVersionedComponentId = existingConnection.getProcessGroup().getVersionedComponentId().orElse(null);
+ final String proposedGroupId = proposedConnection.getGroupIdentifier();
+ if (!Objects.equals(proposedGroupId, connectionGroupVersionedComponentId)) {
+ return true;
+ }
+
+ return false;
+ }
+
private Funnel getTemporaryFunnel(final ProcessGroup group) {
final String tempFunnelId = group.getIdentifier() + TEMP_FUNNEL_ID_SUFFIX;
Funnel temporaryFunnel = context.getFlowManager().getFunnel(tempFunnelId);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 1206256073..6b90bb10d7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -4075,7 +4075,8 @@ public final class StandardProcessGroup implements ProcessGroup {
final VersionControlInformation versionControlInfo = getVersionControlInformation();
if (versionControlInfo != null) {
if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getMetadata().getFlowIdentifier())) {
- throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
+ throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with. Currently synced to " +
+ "flow with ID " + versionControlInfo.getFlowIdentifier() + " but proposed flow's metadata shows flow identifier as " + updatedFlow.getMetadata().getFlowIdentifier());
}
if (verifyNotDirty) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
index c9e07622e0..db714946db 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
@@ -58,6 +58,7 @@ public class FlowDifferenceFilters {
public static boolean isEnvironmentalChange(final FlowDifference difference, final VersionedProcessGroup localGroup, final FlowManager flowManager) {
return difference.getDifferenceType() == DifferenceType.BUNDLE_CHANGED
|| isVariableValueChange(difference)
+ || isSensitivePropertyDueToGhosting(difference, flowManager)
|| isAncestorVariableAdded(difference, flowManager)
|| isRpgUrlChange(difference)
|| isAddedOrRemovedRemotePort(difference)
@@ -75,6 +76,43 @@ public class FlowDifferenceFilters {
|| isParameterContextChange(difference);
}
+ private static boolean isSensitivePropertyDueToGhosting(final FlowDifference difference, final FlowManager flowManager) {
+ if (difference.getDifferenceType() != DifferenceType.PROPERTY_SENSITIVITY_CHANGED) {
+ return false;
+ }
+
+ final String componentAId = difference.getComponentA().getInstanceIdentifier();
+ if (componentAId != null) {
+ final ComponentNode componentNode = getComponent(flowManager, difference.getComponentA().getComponentType(), componentAId);
+ if (componentNode != null && componentNode.isExtensionMissing()) {
+ return true;
+ }
+ }
+
+ final String componentBId = difference.getComponentB().getInstanceIdentifier();
+ if (componentBId != null) {
+ final ComponentNode componentNode = getComponent(flowManager, difference.getComponentA().getComponentType(), componentBId);
+ if (componentNode != null && componentNode.isExtensionMissing()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private static ComponentNode getComponent(final FlowManager flowManager, final ComponentType componentType, final String componentId) {
+ switch (componentType) {
+ case CONTROLLER_SERVICE:
+ return flowManager.getControllerServiceNode(componentId);
+ case PROCESSOR:
+ return flowManager.getProcessorNode(componentId);
+ case REPORTING_TASK:
+ return flowManager.getReportingTaskNode(componentId);
+ }
+
+ return null;
+ }
+
// The Registry URL may change if, for instance, a registry is moved to a new host, or is made secure, the port changes, etc.
// Since this can be handled by the client anyway, there's no need to flag this as a 'local modification'
private static boolean isRegistryUrlChange(final FlowDifference difference) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
index 3bc971d25b..6b3311c572 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
@@ -28,10 +28,10 @@ import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
-import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.ResumeFlowException;
@@ -53,6 +53,7 @@ import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupDescriptorEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.util.AffectedComponentUtils;
import org.apache.nifi.web.util.CancellableTimedPause;
import org.apache.nifi.web.util.ComponentLifecycle;
@@ -375,47 +376,32 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
}
asyncRequest.markStepComplete();
+ // Get the Original Flow Snapshot in case we fail to update and need to rollback
+ final VersionControlInformationEntity vciEntity = serviceFacade.getVersionControlInformation(groupId);
+ final RegisteredFlowSnapshot originalFlowSnapshot = serviceFacade.getVersionedFlowSnapshot(vciEntity.getVersionControlInformation(), true);
+
try {
if (replicateRequest) {
// If replicating request, steps 9-11 are performed on each node individually
+ final URI replicateUri = buildUri(requestUri, replicateUriPath, null);
final NiFiUser user = NiFiUserUtils.getNiFiUser();
- URI replicateUri = null;
try {
- replicateUri = new URI(requestUri.getScheme(), requestUri.getUserInfo(), requestUri.getHost(), requestUri.getPort(),
- replicateUriPath, null, requestUri.getFragment());
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
-
- final Map<String, String> headers = new HashMap<>();
- headers.put("content-type", MediaType.APPLICATION_JSON);
-
- // each concrete class creates its own type of entity for replication
- final Entity replicateEntity = createReplicateUpdateFlowEntity(revision, requestEntity, flowSnapshot);
-
- final NodeResponse clusterResponse;
- try {
- logger.debug("Replicating PUT request to {} for user {}", replicateUri, user);
-
- if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
- clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
+ final NodeResponse clusterResponse = replicateFlowUpdateRequest(replicateUri, user, requestEntity, revision, flowSnapshot);
+ verifyResponseCode(clusterResponse, replicateUri, user, "update");
+ } catch (final Exception e) {
+ if (originalFlowSnapshot == null) {
+ logger.debug("Failed to update flow but could not determine original flow to rollback to so will not make any attempt to revert the flow.");
} else {
- clusterResponse = getRequestReplicator().forwardToCoordinator(
- getClusterCoordinatorNode(), user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
+ try {
+ final NodeResponse rollbackResponse = replicateFlowUpdateRequest(replicateUri, user, requestEntity, revision, originalFlowSnapshot);
+ verifyResponseCode(rollbackResponse, replicateUri, user, "rollback");
+ } catch (final Exception inner) {
+ e.addSuppressed(inner);
+ }
}
- } catch (final InterruptedException ie) {
- logger.warn("Interrupted while replicating PUT request to {} for user {}", replicateUri, user);
- Thread.currentThread().interrupt();
- throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie);
- }
- final int updateFlowStatus = clusterResponse.getStatus();
- if (updateFlowStatus != Status.OK.getStatusCode()) {
- final String explanation = getResponseEntity(clusterResponse, String.class);
- logger.error("Failed to update flow across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}",
- replicateUri, user, updateFlowStatus, explanation);
- throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation);
+ throw e;
}
} else {
// Step 9: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot,
@@ -425,7 +411,27 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
// Step 10-11. Update Process Group to the new flow and update variable registry with any Variables that were added or removed.
// Each concrete class defines its own update flow functionality
- performUpdateFlow(groupId, revision, requestEntity, flowSnapshot, idGenerationSeed, !allowDirtyFlowUpdate, true);
+ try {
+ performUpdateFlow(groupId, revision, requestEntity, flowSnapshot, idGenerationSeed, !allowDirtyFlowUpdate, true);
+ } catch (final Exception e) {
+ // If clustered, just throw the original Exception.
+ // Otherwise, rollback the flow update. We do not perform the rollback if clustered because
+ // we want this to be handled at a higher level, allowing the request to replace our flow version to come from the coordinator
+ // if any node fails to perform the update.
+ if (isClustered()) {
+ throw e;
+ }
+
+ // Rollback the update to the original flow snapshot. If there's any Exception, add it as a Suppressed Exception to the original so
+ // that it can be logged but not overtake the original Exception as the cause.
+ try {
+ performUpdateFlow(groupId, revision, requestEntity, originalFlowSnapshot, idGenerationSeed, false, true);
+ } catch (final Exception inner) {
+ e.addSuppressed(inner);
+ }
+
+ throw e;
+ }
}
} finally {
if (!asyncRequest.isCancelled()) {
@@ -530,6 +536,53 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
asyncRequest.setCancelCallback(null);
}
+ private URI buildUri(final URI requestUri, final String path, final String query) {
+ try {
+ return new URI(requestUri.getScheme(), requestUri.getUserInfo(), requestUri.getHost(), requestUri.getPort(),
+ path, query, requestUri.getFragment());
+ } catch (final URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void verifyResponseCode(final NodeResponse response, final URI uri, final NiFiUser user, final String actionDescription) throws LifecycleManagementException {
+ final int updateFlowStatus = response.getStatus();
+ if (updateFlowStatus != Status.OK.getStatusCode()) {
+ final String explanation = getResponseEntity(response, String.class);
+ logger.error("Failed to {} flow update across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}",
+ actionDescription, uri, user, updateFlowStatus, explanation);
+ throw new LifecycleManagementException("Failed to " + actionDescription + " flow on all nodes in cluster due to " + explanation);
+ }
+ }
+
+ private NodeResponse replicateFlowUpdateRequest(final URI replicateUri, final NiFiUser user, final T requestEntity, final Revision revision, final RegisteredFlowSnapshot flowSnapshot)
+ throws LifecycleManagementException {
+
+ final Map<String, String> headers = new HashMap<>();
+ headers.put("content-type", MediaType.APPLICATION_JSON);
+
+ // each concrete class creates its own type of entity for replication
+ final Entity replicateEntity = createReplicateUpdateFlowEntity(revision, requestEntity, flowSnapshot);
+
+ final NodeResponse clusterResponse;
+ try {
+ logger.debug("Replicating PUT request to {} for user {}", replicateUri, user);
+
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
+ } else {
+ clusterResponse = getRequestReplicator().forwardToCoordinator(
+ getClusterCoordinatorNode(), user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
+ }
+ } catch (final InterruptedException ie) {
+ logger.warn("Interrupted while replicating PUT request to {} for user {}", replicateUri, user);
+ Thread.currentThread().interrupt();
+ throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie);
+ }
+
+ return clusterResponse;
+ }
+
/**
* Get a list of steps to perform for upload flow
*/
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java
index f47846361c..8bf24b6b59 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java
@@ -48,6 +48,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
@@ -209,13 +210,59 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
final File versionDir = new File(flowDir, String.valueOf(version));
final File snapshotFile = new File(versionDir, "snapshot.json");
+ final Pattern intPattern = Pattern.compile("\\d+");
+ final File[] versionFiles = flowDir.listFiles(file -> intPattern.matcher(file.getName()).matches());
+
final JsonFactory factory = new JsonFactory(objectMapper);
try (final JsonParser parser = factory.createParser(snapshotFile)) {
final RegisteredFlowSnapshot snapshot = parser.readValueAs(RegisteredFlowSnapshot.class);
+ populateBucket(snapshot, bucketId);
+ populateFlow(snapshot, bucketId, flowId, version, versionFiles == null ? 0 : versionFiles.length);
+
return snapshot;
}
}
+ private void populateBucket(final RegisteredFlowSnapshot snapshot, final String bucketId) {
+ final FlowRegistryBucket existingBucket = snapshot.getBucket();
+ if (existingBucket != null) {
+ return;
+ }
+
+ final FlowRegistryBucket bucket = new FlowRegistryBucket();
+ bucket.setCreatedTimestamp(System.currentTimeMillis());
+ bucket.setIdentifier(bucketId);
+ bucket.setName(bucketId);
+ bucket.setPermissions(createAllowAllPermissions());
+ snapshot.setBucket(bucket);
+
+ snapshot.getSnapshotMetadata().setBucketIdentifier(bucketId);
+ }
+
+ private void populateFlow(final RegisteredFlowSnapshot snapshot, final String bucketId, final String flowId, final int version, final int numVersions) {
+ final RegisteredFlow existingFlow = snapshot.getFlow();
+ if (existingFlow != null) {
+ return;
+ }
+
+ final RegisteredFlow flow = new RegisteredFlow();
+ flow.setCreatedTimestamp(System.currentTimeMillis());
+ flow.setLastModifiedTimestamp(System.currentTimeMillis());
+ flow.setBucketIdentifier(bucketId);
+ flow.setBucketName(bucketId);
+ flow.setIdentifier(flowId);
+ flow.setName(flowId);
+ flow.setPermissions(createAllowAllPermissions());
+ flow.setVersionCount(numVersions);
+
+ final RegisteredFlowVersionInfo versionInfo = new RegisteredFlowVersionInfo();
+ versionInfo.setVersion(version);
+ flow.setVersionInfo(versionInfo);
+
+ snapshot.setFlow(flow);
+ snapshot.getSnapshotMetadata().setFlowIdentifier(flowId);
+ }
+
@Override
public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot) throws IOException {
final File rootDir = getRootDirectory(context);
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index b6f1c37623..0fb983a8f5 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -38,6 +38,7 @@ import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
+import org.apache.nifi.web.api.dto.DifferenceDTO;
import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
import org.apache.nifi.web.api.dto.FlowRegistryClientDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
@@ -75,6 +76,7 @@ import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
import org.apache.nifi.web.api.entity.CountersEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity;
+import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
@@ -1652,6 +1654,74 @@ public class NiFiClientUtil {
return nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, groupEntity);
}
+ public VersionedFlowUpdateRequestEntity changeFlowVersion(final String processGroupId, final int version) throws NiFiClientException, IOException, InterruptedException {
+ final ProcessGroupEntity groupEntity = nifiClient.getProcessGroupClient().getProcessGroup(processGroupId);
+ final ProcessGroupDTO groupDto = groupEntity.getComponent();
+ final VersionControlInformationDTO vciDto = groupDto.getVersionControlInformation();
+ if (vciDto == null) {
+ throw new IllegalArgumentException("Process Group with ID " + processGroupId + " is not under Version Control");
+ }
+
+ vciDto.setVersion(version);
+
+ final VersionControlInformationEntity requestEntity = new VersionControlInformationEntity();
+ requestEntity.setProcessGroupRevision(groupEntity.getRevision());
+ requestEntity.setVersionControlInformation(vciDto);
+
+ final VersionedFlowUpdateRequestEntity result = nifiClient.getVersionsClient().updateVersionControlInfo(processGroupId, requestEntity);
+ return waitForVersionFlowUpdateComplete(result.getRequest().getRequestId());
+ }
+
+ public VersionedFlowUpdateRequestEntity waitForVersionFlowUpdateComplete(final String updateRequestId) throws NiFiClientException, IOException, InterruptedException {
+ while (true) {
+ final VersionedFlowUpdateRequestEntity result = nifiClient.getVersionsClient().getUpdateRequest(updateRequestId);
+ final boolean complete = result.getRequest().isComplete();
+ if (complete) {
+ return nifiClient.getVersionsClient().deleteUpdateRequest(updateRequestId);
+ }
+
+ logger.debug("Waiting for Version Flow Update request to complete...");
+ Thread.sleep(100L);
+ }
+ }
+
+ public void assertFlowStaleAndUnmodified(final String processGroupId) throws NiFiClientException, IOException {
+ final String state = nifiClient.getProcessGroupClient().getProcessGroup(processGroupId).getVersionedFlowState();
+ if ("STALE".equalsIgnoreCase(state)) {
+ return;
+ }
+
+ if ("LOCALLY_MODIFIED_AND_STALE".equalsIgnoreCase(state)) {
+ final FlowComparisonEntity flowComparisonEntity = nifiClient.getProcessGroupClient().getLocalModifications(processGroupId);
+ final String differences = flowComparisonEntity.getComponentDifferences().stream()
+ .flatMap(dto -> dto.getDifferences().stream())
+ .map(DifferenceDTO::getDifference)
+ .collect(Collectors.joining("\n"));
+ throw new AssertionError("Expected state to be STALE but was " + state + " with the following modifications:\n" + differences);
+ }
+
+ throw new AssertionError("Expected state to be STALE but was " + state);
+ }
+
+ public void assertFlowUpToDate(final String processGroupId) throws NiFiClientException, IOException {
+ final String state = nifiClient.getProcessGroupClient().getProcessGroup(processGroupId).getVersionedFlowState();
+ if ("UP_TO_DATE".equalsIgnoreCase(state)) {
+ return;
+ }
+
+ if ("LOCALLY_MODIFIED".equalsIgnoreCase(state)) {
+ final FlowComparisonEntity flowComparisonEntity = nifiClient.getProcessGroupClient().getLocalModifications(processGroupId);
+ final String differences = flowComparisonEntity.getComponentDifferences().stream()
+ .flatMap(dto -> dto.getDifferences().stream())
+ .map(DifferenceDTO::getDifference)
+ .collect(Collectors.joining("\n"));
+ throw new AssertionError("Expected state to be UP_TO_DATE but was LOCALLY_MODIFIED with the following modifications:\n" + differences);
+ }
+
+ throw new AssertionError("Expected state to be UP_TO_DATE but was " + state);
+
+ }
+
public FlowEntity copyAndPaste(final ProcessGroupEntity pgEntity, final String destinationGroupId) throws NiFiClientException, IOException {
final SnippetDTO snippetDto = new SnippetDTO();
snippetDto.setProcessGroups(Collections.singletonMap(pgEntity.getId(), pgEntity.getRevision()));
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ClusteredRegistryClientIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ClusteredRegistryClientIT.java
new file mode 100644
index 0000000000..d592323775
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ClusteredRegistryClientIT.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.registry;
+
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+
+public class ClusteredRegistryClientIT extends RegistryClientIT {
+
+ @Override
+ public NiFiInstanceFactory getInstanceFactory() {
+ return createTwoNodeInstanceFactory();
+ }
+
+}
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
index 9d9334951e..40bf8c32b7 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
@@ -20,6 +20,7 @@ package org.apache.nifi.tests.system.registry;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
@@ -30,6 +31,7 @@ import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
+import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -37,14 +39,87 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class RegistryClientIT extends NiFiSystemIT {
+
+ @Test
+ public void testChangeVersionWithPortMoveBetweenGroups() throws NiFiClientException, IOException, InterruptedException {
+ final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows"));
+
+ final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), "test-flows", "port-moved-groups", 1);
+ assertNotNull(imported);
+ getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
+
+ // Ensure that the import worked as expected
+ final FlowSnippetDTO groupContents = imported.getComponent().getContents();
+ final List<ProcessorDTO> replaceTextProcessors = groupContents.getProcessors().stream()
+ .filter(proc -> proc.getName().equals("ReplaceText"))
+ .collect(Collectors.toList());
+ assertEquals(1, replaceTextProcessors.size());
+
+ assertTrue(groupContents.getInputPorts().isEmpty());
+
+ // Change to version 2
+ final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), 2);
+ assertNull(version2Result.getRequest().getFailureReason());
+
+ final FlowDTO v2Contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow();
+ getClientUtil().assertFlowUpToDate(imported.getId());
+
+ // Ensure that the ReplaceText processor still exists
+ final long replaceTextCount = v2Contents.getProcessors().stream()
+ .map(ProcessorEntity::getComponent)
+ .filter(proc -> proc.getName().equals("ReplaceText"))
+ .filter(proc -> proc.getId().equals(replaceTextProcessors.get(0).getId()))
+ .count();
+ assertEquals(1, replaceTextCount);
+
+ // Ensure that we now have a Port at the top level
+ assertEquals(1, v2Contents.getInputPorts().size());
+
+ // Change back to Version 1
+ final VersionedFlowUpdateRequestEntity changeBackToV1Result = getClientUtil().changeFlowVersion(imported.getId(), 1);
+ assertNull(changeBackToV1Result.getRequest().getFailureReason());
+
+ final FlowDTO v1Contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow();
+ getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
+
+ // Ensure that we no longer have a Port at the top level
+ assertTrue(v1Contents.getInputPorts().isEmpty());
+ }
+
+
+ @Test
+ public void testRollbackOnFailure() throws NiFiClientException, IOException, InterruptedException {
+ final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows"));
+
+ final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), "test-flows", "flow-with-invalid-connection", 1);
+ assertNotNull(imported);
+ getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
+
+ final VersionedFlowUpdateRequestEntity version2Result = getClientUtil().changeFlowVersion(imported.getId(), 2);
+ final String failureReason = version2Result.getRequest().getFailureReason();
+ assertNotNull(failureReason);
+
+ // Ensure that we're still on v1 of the flow and there are no local modifications
+ getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
+
+ // Ensure that the processors still exist
+ final FlowDTO contents = getNifiClient().getFlowClient().getProcessGroup(imported.getId()).getProcessGroupFlow().getFlow();
+ assertEquals(1, contents.getProcessors().size());
+ }
+
+
@Test
public void testStartVersionControlThenImport() throws NiFiClientException, IOException {
final FlowRegistryClientEntity clientEntity = registerClient();
@@ -68,10 +143,15 @@ public class RegistryClientIT extends NiFiSystemIT {
}
private FlowRegistryClientEntity registerClient() throws NiFiClientException, IOException {
- final String clientName = String.format("FileRegistry-%s", UUID.randomUUID());
- final FlowRegistryClientEntity clientEntity = getClientUtil().createFlowRegistryClient(clientName);
final File storageDir = new File("target/flowRegistryStorage/" + getTestName().replace("\\(.*?\\)", ""));
Files.createDirectories(storageDir.toPath());
+
+ return registerClient(storageDir);
+ }
+
+ private FlowRegistryClientEntity registerClient(final File storageDir) throws NiFiClientException, IOException {
+ final String clientName = String.format("FileRegistry-%s", UUID.randomUUID());
+ final FlowRegistryClientEntity clientEntity = getClientUtil().createFlowRegistryClient(clientName);
getClientUtil().updateRegistryClientProperties(clientEntity, Collections.singletonMap("Directory", storageDir.getAbsolutePath()));
return clientEntity;
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
index 051ca4d936..86900d809e 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
@@ -27,7 +27,7 @@ java.arg.3=-Xmx512m
java.arg.14=-Djava.awt.headless=true
-#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8002
+java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002
# Disable Logback web shutdown hook using System property
java.arg.logbackShutdown=-DlogbackDisableServletContainerInitializer=true
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml
index 9471514f10..1a62d56362 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml
@@ -113,6 +113,7 @@
<logger name="org.apache.nifi.controller.reporting.LogComponentStatuses" level="ERROR" />
<logger name="org.apache.calcite.runtime.CalciteException" level="OFF" />
+ <logger name="deprecation" level="OFF" />
<logger name="org.apache.curator.framework.recipes.leader.LeaderSelector" level="OFF" />
<logger name="org.apache.curator.ConnectionState" level="OFF" />
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
index 7c6426c67b..222206984e 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
@@ -248,4 +248,4 @@ nifi.kerberos.spnego.authentication.expiration=12 hours
# external properties files for variable registry
# supports a comma delimited list of file locations
-nifi.variable.registry.properties=
\ No newline at end of file
+nifi.variable.registry.properties=
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/flow-with-invalid-connection/1/snapshot.json b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/flow-with-invalid-connection/1/snapshot.json
new file mode 100644
index 0000000000..4c2d4f0227
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/flow-with-invalid-connection/1/snapshot.json
@@ -0,0 +1,305 @@
+{
+ "externalControllerServices": {},
+ "flowContents": {
+ "comments": "",
+ "componentType": "PROCESS_GROUP",
+ "connections": [
+ {
+ "backPressureDataSizeThreshold": "1 GB",
+ "backPressureObjectThreshold": 10000,
+ "bends": [],
+ "componentType": "CONNECTION",
+ "destination": {
+ "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+ "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+ "name": "in",
+ "type": "INPUT_PORT"
+ },
+ "flowFileExpiration": "0 sec",
+ "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "identifier": "ff659054-0dee-3d85-b88f-6a4fc53639bd",
+ "instanceIdentifier": "5bd1fbc3-0186-1000-ec2a-0600eb0f67c9",
+ "labelIndex": 1,
+ "loadBalanceCompression": "DO_NOT_COMPRESS",
+ "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+ "name": "",
+ "partitioningAttribute": "",
+ "prioritizers": [],
+ "selectedRelationships": [
+ "failure",
+ "success"
+ ],
+ "source": {
+ "comments": "",
+ "groupId": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "id": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
+ "instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
+ "name": "ReplaceText",
+ "type": "PROCESSOR"
+ },
+ "zIndex": 0
+ }
+ ],
+ "controllerServices": [],
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultFlowFileExpiration": "0 sec",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+ "funnels": [],
+ "identifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "inputPorts": [],
+ "instanceIdentifier": "5bd17e2a-0186-1000-48b9-aa19b73d5a66",
+ "labels": [],
+ "name": "parent",
+ "outputPorts": [],
+ "position": {
+ "x": 614.0,
+ "y": 462.0
+ },
+ "processGroups": [
+ {
+ "comments": "",
+ "componentType": "PROCESS_GROUP",
+ "connections": [
+ {
+ "backPressureDataSizeThreshold": "1 GB",
+ "backPressureObjectThreshold": 10000,
+ "bends": [],
+ "componentType": "CONNECTION",
+ "destination": {
+ "comments": "",
+ "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "id": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
+ "instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
+ "name": "UpdateAttribute",
+ "type": "PROCESSOR"
+ },
+ "flowFileExpiration": "0 sec",
+ "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "identifier": "776f1f03-e742-3287-841b-8aedb9ada176",
+ "instanceIdentifier": "5bd1bdd3-0186-1000-dc09-4c6d9bffa220",
+ "labelIndex": 1,
+ "loadBalanceCompression": "DO_NOT_COMPRESS",
+ "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+ "name": "",
+ "partitioningAttribute": "",
+ "prioritizers": [],
+ "selectedRelationships": [
+ ""
+ ],
+ "source": {
+ "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+ "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+ "name": "in",
+ "type": "INPUT_PORT"
+ },
+ "zIndex": 0
+ }
+ ],
+ "controllerServices": [],
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultFlowFileExpiration": "0 sec",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+ "funnels": [],
+ "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "identifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "inputPorts": [
+ {
+ "allowRemoteAccess": false,
+ "componentType": "INPUT_PORT",
+ "concurrentlySchedulableTaskCount": 1,
+ "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "identifier": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+ "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+ "name": "in",
+ "position": {
+ "x": 784.0,
+ "y": 146.0
+ },
+ "scheduledState": "ENABLED",
+ "type": "INPUT_PORT"
+ }
+ ],
+ "instanceIdentifier": "5bd1937d-0186-1000-eba8-f831e822907c",
+ "labels": [],
+ "name": "child",
+ "outputPorts": [],
+ "position": {
+ "x": 824.0,
+ "y": 648.0
+ },
+ "processGroups": [],
+ "processors": [
+ {
+ "autoTerminatedRelationships": [],
+ "backoffMechanism": "PENALIZE_FLOWFILE",
+ "bulletinLevel": "WARN",
+ "bundle": {
+ "artifact": "nifi-update-attribute-nar",
+ "group": "org.apache.nifi",
+ "version": "1.16.0.2.1.4.0-53"
+ },
+ "comments": "",
+ "componentType": "PROCESSOR",
+ "concurrentlySchedulableTaskCount": 1,
+ "executionNode": "ALL",
+ "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "identifier": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
+ "instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
+ "maxBackoffPeriod": "10 mins",
+ "name": "UpdateAttribute",
+ "penaltyDuration": "30 sec",
+ "position": {
+ "x": 735.0,
+ "y": 310.0
+ },
+ "properties": {
+ "Store State": "Do not store state",
+ "canonical-value-lookup-cache-size": "100"
+ },
+ "propertyDescriptors": {
+ "Delete Attributes Expression": {
+ "displayName": "Delete Attributes Expression",
+ "identifiesControllerService": false,
+ "name": "Delete Attributes Expression",
+ "sensitive": false
+ },
+ "Store State": {
+ "displayName": "Store State",
+ "identifiesControllerService": false,
+ "name": "Store State",
+ "sensitive": false
+ },
+ "canonical-value-lookup-cache-size": {
+ "displayName": "Cache Value Lookup Cache Size",
+ "identifiesControllerService": false,
+ "name": "canonical-value-lookup-cache-size",
+ "sensitive": false
+ },
+ "Stateful Variables Initial Value": {
+ "displayName": "Stateful Variables Initial Value",
+ "identifiesControllerService": false,
+ "name": "Stateful Variables Initial Value",
+ "sensitive": false
+ }
+ },
+ "retriedRelationships": [],
+ "retryCount": 10,
+ "runDurationMillis": 0,
+ "scheduledState": "ENABLED",
+ "schedulingPeriod": "0 sec",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "style": {},
+ "type": "org.apache.nifi.processors.attributes.UpdateAttribute",
+ "yieldDuration": "1 sec"
+ }
+ ],
+ "remoteProcessGroups": [],
+ "variables": {}
+ }
+ ],
+ "processors": [
+ {
+ "autoTerminatedRelationships": [],
+ "backoffMechanism": "PENALIZE_FLOWFILE",
+ "bulletinLevel": "WARN",
+ "bundle": {
+ "artifact": "nifi-standard-nar",
+ "group": "org.apache.nifi",
+ "version": "1.16.0.2.1.4.0-53"
+ },
+ "comments": "",
+ "componentType": "PROCESSOR",
+ "concurrentlySchedulableTaskCount": 1,
+ "executionNode": "ALL",
+ "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "identifier": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
+ "instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
+ "maxBackoffPeriod": "10 mins",
+ "name": "ReplaceText",
+ "penaltyDuration": "30 sec",
+ "position": {
+ "x": 848.0,
+ "y": 384.0
+ },
+ "properties": {
+ "Regular Expression": "(?s)(^.*$)",
+ "Replacement Value": "$1",
+ "Evaluation Mode": "Line-by-Line",
+ "Line-by-Line Evaluation Mode": "All",
+ "Character Set": "UTF-8",
+ "Maximum Buffer Size": "1 MB",
+ "Replacement Strategy": "Regex Replace"
+ },
+ "propertyDescriptors": {
+ "Regular Expression": {
+ "displayName": "Search Value",
+ "identifiesControllerService": false,
+ "name": "Regular Expression",
+ "sensitive": false
+ },
+ "Replacement Value": {
+ "displayName": "Replacement Value",
+ "identifiesControllerService": false,
+ "name": "Replacement Value",
+ "sensitive": false
+ },
+ "Evaluation Mode": {
+ "displayName": "Evaluation Mode",
+ "identifiesControllerService": false,
+ "name": "Evaluation Mode",
+ "sensitive": false
+ },
+ "Line-by-Line Evaluation Mode": {
+ "displayName": "Line-by-Line Evaluation Mode",
+ "identifiesControllerService": false,
+ "name": "Line-by-Line Evaluation Mode",
+ "sensitive": false
+ },
+ "Character Set": {
+ "displayName": "Character Set",
+ "identifiesControllerService": false,
+ "name": "Character Set",
+ "sensitive": false
+ },
+ "Maximum Buffer Size": {
+ "displayName": "Maximum Buffer Size",
+ "identifiesControllerService": false,
+ "name": "Maximum Buffer Size",
+ "sensitive": false
+ },
+ "Replacement Strategy": {
+ "displayName": "Replacement Strategy",
+ "identifiesControllerService": false,
+ "name": "Replacement Strategy",
+ "sensitive": false
+ }
+ },
+ "retriedRelationships": [],
+ "retryCount": 10,
+ "runDurationMillis": 0,
+ "scheduledState": "ENABLED",
+ "schedulingPeriod": "0 sec",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "style": {},
+ "type": "org.apache.nifi.processors.standard.ReplaceText",
+ "yieldDuration": "1 sec"
+ }
+ ],
+ "remoteProcessGroups": [],
+ "variables": {}
+ },
+ "flowEncodingVersion": "1.0",
+ "parameterContexts": {},
+ "snapshotMetadata": {
+ "author": "anonymous",
+ "comments": "",
+ "timestamp": 1676577758403,
+ "version": 1
+ }
+}
\ No newline at end of file
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/flow-with-invalid-connection/2/snapshot.json b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/flow-with-invalid-connection/2/snapshot.json
new file mode 100644
index 0000000000..d68c67c90e
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/flow-with-invalid-connection/2/snapshot.json
@@ -0,0 +1,156 @@
+{
+ "externalControllerServices": {},
+ "flowContents": {
+ "comments": "",
+ "componentType": "PROCESS_GROUP",
+ "connections": [
+ {
+ "backPressureDataSizeThreshold": "1 GB",
+ "backPressureObjectThreshold": 10000,
+ "bends": [],
+ "componentType": "CONNECTION",
+ "destination": {
+ "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+ "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+ "name": "in",
+ "type": "INPUT_PORT"
+ },
+ "flowFileExpiration": "0 sec",
+ "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "identifier": "ff659054-0dee-3d85-b88f-6a4fc53639bd",
+ "instanceIdentifier": "5bd1fbc3-0186-1000-ec2a-0600eb0f67c9",
+ "labelIndex": 1,
+ "loadBalanceCompression": "DO_NOT_COMPRESS",
+ "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+ "name": "",
+ "partitioningAttribute": "",
+ "prioritizers": [],
+ "selectedRelationships": [
+ "failure",
+ "success"
+ ],
+ "source": {
+ "comments": "",
+ "groupId": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "id": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
+ "instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
+ "name": "ReplaceText",
+ "type": "PROCESSOR"
+ },
+ "zIndex": 0
+ }
+ ],
+ "controllerServices": [],
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultFlowFileExpiration": "0 sec",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+ "funnels": [],
+ "identifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "inputPorts": [],
+ "instanceIdentifier": "5bd17e2a-0186-1000-48b9-aa19b73d5a66",
+ "labels": [],
+ "name": "parent",
+ "outputPorts": [],
+ "position": {
+ "x": 614.0,
+ "y": 462.0
+ },
+ "processGroups": [
+ {
+ "comments": "",
+ "componentType": "PROCESS_GROUP",
+ "connections": [
+ {
+ "backPressureDataSizeThreshold": "1 GB",
+ "backPressureObjectThreshold": 10000,
+ "bends": [],
+ "componentType": "CONNECTION",
+ "destination": {
+ "comments": "",
+ "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "id": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
+ "instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
+ "name": "UpdateAttribute",
+ "type": "PROCESSOR"
+ },
+ "flowFileExpiration": "0 sec",
+ "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "identifier": "776f1f03-e742-3287-841b-8aedb9ada176",
+ "instanceIdentifier": "5bd1bdd3-0186-1000-dc09-4c6d9bffa220",
+ "labelIndex": 1,
+ "loadBalanceCompression": "DO_NOT_COMPRESS",
+ "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+ "name": "",
+ "partitioningAttribute": "",
+ "prioritizers": [],
+ "selectedRelationships": [
+ ""
+ ],
+ "source": {
+ "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+ "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+ "name": "in",
+ "type": "INPUT_PORT"
+ },
+ "zIndex": 0
+ }
+ ],
+ "controllerServices": [],
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultFlowFileExpiration": "0 sec",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+ "funnels": [],
+ "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "identifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "inputPorts": [
+ {
+ "allowRemoteAccess": false,
+ "componentType": "INPUT_PORT",
+ "concurrentlySchedulableTaskCount": 1,
+ "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "identifier": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+ "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+ "name": "in",
+ "position": {
+ "x": 784.0,
+ "y": 146.0
+ },
+ "scheduledState": "ENABLED",
+ "type": "INPUT_PORT"
+ }
+ ],
+ "instanceIdentifier": "5bd1937d-0186-1000-eba8-f831e822907c",
+ "labels": [],
+ "name": "child",
+ "outputPorts": [],
+ "position": {
+ "x": 824.0,
+ "y": 648.0
+ },
+ "processGroups": [],
+ "processors": [
+ ],
+ "remoteProcessGroups": [],
+ "variables": {}
+ }
+ ],
+ "processors": [
+ ],
+ "remoteProcessGroups": [],
+ "variables": {}
+ },
+ "flowEncodingVersion": "1.0",
+ "parameterContexts": {},
+ "snapshotMetadata": {
+ "author": "anonymous",
+ "comments": "",
+ "timestamp": 1676577758403,
+ "version": 1
+ }
+}
\ No newline at end of file
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/port-moved-groups/1/snapshot.json b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/port-moved-groups/1/snapshot.json
new file mode 100644
index 0000000000..4c2d4f0227
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/port-moved-groups/1/snapshot.json
@@ -0,0 +1,305 @@
+{
+ "externalControllerServices": {},
+ "flowContents": {
+ "comments": "",
+ "componentType": "PROCESS_GROUP",
+ "connections": [
+ {
+ "backPressureDataSizeThreshold": "1 GB",
+ "backPressureObjectThreshold": 10000,
+ "bends": [],
+ "componentType": "CONNECTION",
+ "destination": {
+ "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+ "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+ "name": "in",
+ "type": "INPUT_PORT"
+ },
+ "flowFileExpiration": "0 sec",
+ "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "identifier": "ff659054-0dee-3d85-b88f-6a4fc53639bd",
+ "instanceIdentifier": "5bd1fbc3-0186-1000-ec2a-0600eb0f67c9",
+ "labelIndex": 1,
+ "loadBalanceCompression": "DO_NOT_COMPRESS",
+ "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+ "name": "",
+ "partitioningAttribute": "",
+ "prioritizers": [],
+ "selectedRelationships": [
+ "failure",
+ "success"
+ ],
+ "source": {
+ "comments": "",
+ "groupId": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "id": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
+ "instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
+ "name": "ReplaceText",
+ "type": "PROCESSOR"
+ },
+ "zIndex": 0
+ }
+ ],
+ "controllerServices": [],
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultFlowFileExpiration": "0 sec",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+ "funnels": [],
+ "identifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "inputPorts": [],
+ "instanceIdentifier": "5bd17e2a-0186-1000-48b9-aa19b73d5a66",
+ "labels": [],
+ "name": "parent",
+ "outputPorts": [],
+ "position": {
+ "x": 614.0,
+ "y": 462.0
+ },
+ "processGroups": [
+ {
+ "comments": "",
+ "componentType": "PROCESS_GROUP",
+ "connections": [
+ {
+ "backPressureDataSizeThreshold": "1 GB",
+ "backPressureObjectThreshold": 10000,
+ "bends": [],
+ "componentType": "CONNECTION",
+ "destination": {
+ "comments": "",
+ "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "id": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
+ "instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
+ "name": "UpdateAttribute",
+ "type": "PROCESSOR"
+ },
+ "flowFileExpiration": "0 sec",
+ "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "identifier": "776f1f03-e742-3287-841b-8aedb9ada176",
+ "instanceIdentifier": "5bd1bdd3-0186-1000-dc09-4c6d9bffa220",
+ "labelIndex": 1,
+ "loadBalanceCompression": "DO_NOT_COMPRESS",
+ "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
+ "name": "",
+ "partitioningAttribute": "",
+ "prioritizers": [],
+ "selectedRelationships": [
+ ""
+ ],
+ "source": {
+ "groupId": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "id": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+ "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+ "name": "in",
+ "type": "INPUT_PORT"
+ },
+ "zIndex": 0
+ }
+ ],
+ "controllerServices": [],
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultFlowFileExpiration": "0 sec",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+ "funnels": [],
+ "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "identifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "inputPorts": [
+ {
+ "allowRemoteAccess": false,
+ "componentType": "INPUT_PORT",
+ "concurrentlySchedulableTaskCount": 1,
+ "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "identifier": "448da11f-ef3a-38d1-bdc2-2f1a112b2679",
+ "instanceIdentifier": "5bd1a091-0186-1000-d9a9-5003d6a3ad8d",
+ "name": "in",
+ "position": {
+ "x": 784.0,
+ "y": 146.0
+ },
+ "scheduledState": "ENABLED",
+ "type": "INPUT_PORT"
+ }
+ ],
+ "instanceIdentifier": "5bd1937d-0186-1000-eba8-f831e822907c",
+ "labels": [],
+ "name": "child",
+ "outputPorts": [],
+ "position": {
+ "x": 824.0,
+ "y": 648.0
+ },
+ "processGroups": [],
+ "processors": [
+ {
+ "autoTerminatedRelationships": [],
+ "backoffMechanism": "PENALIZE_FLOWFILE",
+ "bulletinLevel": "WARN",
+ "bundle": {
+ "artifact": "nifi-update-attribute-nar",
+ "group": "org.apache.nifi",
+ "version": "1.16.0.2.1.4.0-53"
+ },
+ "comments": "",
+ "componentType": "PROCESSOR",
+ "concurrentlySchedulableTaskCount": 1,
+ "executionNode": "ALL",
+ "groupIdentifier": "2d4624cb-407d-3c6c-be1a-3f913fb2cead",
+ "identifier": "941ef216-74ef-3225-bf73-a9e8b167c0a8",
+ "instanceIdentifier": "5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b",
+ "maxBackoffPeriod": "10 mins",
+ "name": "UpdateAttribute",
+ "penaltyDuration": "30 sec",
+ "position": {
+ "x": 735.0,
+ "y": 310.0
+ },
+ "properties": {
+ "Store State": "Do not store state",
+ "canonical-value-lookup-cache-size": "100"
+ },
+ "propertyDescriptors": {
+ "Delete Attributes Expression": {
+ "displayName": "Delete Attributes Expression",
+ "identifiesControllerService": false,
+ "name": "Delete Attributes Expression",
+ "sensitive": false
+ },
+ "Store State": {
+ "displayName": "Store State",
+ "identifiesControllerService": false,
+ "name": "Store State",
+ "sensitive": false
+ },
+ "canonical-value-lookup-cache-size": {
+ "displayName": "Cache Value Lookup Cache Size",
+ "identifiesControllerService": false,
+ "name": "canonical-value-lookup-cache-size",
+ "sensitive": false
+ },
+ "Stateful Variables Initial Value": {
+ "displayName": "Stateful Variables Initial Value",
+ "identifiesControllerService": false,
+ "name": "Stateful Variables Initial Value",
+ "sensitive": false
+ }
+ },
+ "retriedRelationships": [],
+ "retryCount": 10,
+ "runDurationMillis": 0,
+ "scheduledState": "ENABLED",
+ "schedulingPeriod": "0 sec",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "style": {},
+ "type": "org.apache.nifi.processors.attributes.UpdateAttribute",
+ "yieldDuration": "1 sec"
+ }
+ ],
+ "remoteProcessGroups": [],
+ "variables": {}
+ }
+ ],
+ "processors": [
+ {
+ "autoTerminatedRelationships": [],
+ "backoffMechanism": "PENALIZE_FLOWFILE",
+ "bulletinLevel": "WARN",
+ "bundle": {
+ "artifact": "nifi-standard-nar",
+ "group": "org.apache.nifi",
+ "version": "1.16.0.2.1.4.0-53"
+ },
+ "comments": "",
+ "componentType": "PROCESSOR",
+ "concurrentlySchedulableTaskCount": 1,
+ "executionNode": "ALL",
+ "groupIdentifier": "30330c01-185a-3217-b764-39aef8d2a05f",
+ "identifier": "6d9c5768-a1ac-313d-b073-2eefa2712ba9",
+ "instanceIdentifier": "5bd1e3d3-0186-1000-2fd9-222383ab69ce",
+ "maxBackoffPeriod": "10 mins",
+ "name": "ReplaceText",
+ "penaltyDuration": "30 sec",
+ "position": {
+ "x": 848.0,
+ "y": 384.0
+ },
+ "properties": {
+ "Regular Expression": "(?s)(^.*$)",
+ "Replacement Value": "$1",
+ "Evaluation Mode": "Line-by-Line",
+ "Line-by-Line Evaluation Mode": "All",
+ "Character Set": "UTF-8",
+ "Maximum Buffer Size": "1 MB",
+ "Replacement Strategy": "Regex Replace"
+ },
+ "propertyDescriptors": {
+ "Regular Expression": {
+ "displayName": "Search Value",
+ "identifiesControllerService": false,
+ "name": "Regular Expression",
+ "sensitive": false
+ },
+ "Replacement Value": {
+ "displayName": "Replacement Value",
+ "identifiesControllerService": false,
+ "name": "Replacement Value",
+ "sensitive": false
+ },
+ "Evaluation Mode": {
+ "displayName": "Evaluation Mode",
+ "identifiesControllerService": false,
+ "name": "Evaluation Mode",
+ "sensitive": false
+ },
+ "Line-by-Line Evaluation Mode": {
+ "displayName": "Line-by-Line Evaluation Mode",
+ "identifiesControllerService": false,
+ "name": "Line-by-Line Evaluation Mode",
+ "sensitive": false
+ },
+ "Character Set": {
+ "displayName": "Character Set",
+ "identifiesControllerService": false,
+ "name": "Character Set",
+ "sensitive": false
+ },
+ "Maximum Buffer Size": {
+ "displayName": "Maximum Buffer Size",
+ "identifiesControllerService": false,
+ "name": "Maximum Buffer Size",
+ "sensitive": false
+ },
+ "Replacement Strategy": {
+ "displayName": "Replacement Strategy",
+ "identifiesControllerService": false,
+ "name": "Replacement Strategy",
+ "sensitive": false
+ }
+ },
+ "retriedRelationships": [],
+ "retryCount": 10,
+ "runDurationMillis": 0,
+ "scheduledState": "ENABLED",
+ "schedulingPeriod": "0 sec",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "style": {},
+ "type": "org.apache.nifi.processors.standard.ReplaceText",
+ "yieldDuration": "1 sec"
+ }
+ ],
+ "remoteProcessGroups": [],
+ "variables": {}
+ },
+ "flowEncodingVersion": "1.0",
+ "parameterContexts": {},
+ "snapshotMetadata": {
+ "author": "anonymous",
+ "comments": "",
+ "timestamp": 1676577758403,
+ "version": 1
+ }
+}
\ No newline at end of file
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/port-moved-groups/2/snapshot.json b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/port-moved-groups/2/snapshot.json
new file mode 100644
index 0000000000..bc80a0a74b
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/versioned-flows/test-flows/port-moved-groups/2/snapshot.json
@@ -0,0 +1 @@
+{"externalControllerServices":{},"flowContents":{"comments":"","componentType":"PROCESS_GROUP","connections":[{"backPressureDataSizeThreshold":"1 GB","backPressureObjectThreshold":10000,"bends":[],"componentType":"CONNECTION","destination":{"comments":"","groupId":"30330c01-185a-3217-b764-39aef8d2a05f","id":"941ef216-74ef-3225-bf73-a9e8b167c0a8","instanceIdentifier":"5bd1afe6-0186-1000-f3ac-fcffc8b6ce2b","name":"UpdateAttribute","type":"PROCESSOR"},"flowFileExpiration":"0 sec","groupIden [...]
\ No newline at end of file
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java
index 5d954e3f0b..edbf08fe99 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java
@@ -19,6 +19,7 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
+import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
@@ -70,4 +71,5 @@ public interface ProcessGroupClient {
FlowEntity copySnippet(String processGroupId, CopySnippetRequestEntity copySnippetRequestEntity)
throws NiFiClientException, IOException;
+ FlowComparisonEntity getLocalModifications(String processGroupId) throws NiFiClientException, IOException;
}
diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java
index 5bf9b5fccc..4f2ad9e290 100644
--- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java
+++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java
@@ -23,6 +23,7 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
+import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
@@ -333,4 +334,19 @@ public class JerseyProcessGroupClient extends AbstractJerseyClient implements Pr
});
}
+ @Override
+ public FlowComparisonEntity getLocalModifications(final String processGroupId) throws NiFiClientException, IOException {
+ if (StringUtils.isBlank(processGroupId)) {
+ throw new IllegalArgumentException("Process group id cannot be null or blank");
+ }
+
+ return executeAction("Error retrieving list of local flow modifications", () -> {
+ final WebTarget target = processGroupsTarget
+ .path("{id}/local-modifications")
+ .resolveTemplate("id", processGroupId);
+
+ return getRequestBuilder(target).get(FlowComparisonEntity.class);
+ });
+ }
+
}