You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/11/18 02:15:39 UTC

[GitHub] [nifi] greyp9 commented on a change in pull request #5514: NIFI-9069: Changed framework so that it serializes the dataflow into …

greyp9 commented on a change in pull request #5514:
URL: https://github.com/apache/nifi/pull/5514#discussion_r751747842



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
##########
@@ -136,4 +152,29 @@ public void error(final SAXParseException e) {
             throw new FlowSerializationException(ex);
         }
     }
+
+    private VersionedDataflow parseVersionedDataflow(final byte[] flow) {
+        if (flow == null || flow.length == 0) {
+            return null;
+        }
+
+        try {
+            final ObjectMapper objectMapper = new ObjectMapper();
+            objectMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector(objectMapper.getTypeFactory()));
+            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+            final VersionedDataflow versionedDataflow = objectMapper.readValue(flow, VersionedDataflow.class);
+            return versionedDataflow;
+        } catch (final Exception e) {
+            throw new FlowSerializationException("Could not parse flow as a VersionedDataflow", e);
+        }
+    }
+
+    public boolean isXml() {
+        if (flow == null || flow.length == 0) {
+            return true;
+        }
+
+        return flow[0] == '<';

Review comment:
       Would it also make sense to have an "isJson()" equivalent, and maybe an "isValid()" (expecting one or the other)?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/GroupSynchronizationOptions.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.groups;
+
+public class GroupSynchronizationOptions {
+    private final ComponentIdGenerator componentIdGenerator;
+    private final ComponentScheduler componentScheduler;
+    private final PropertyDecryptor propertyDecryptor;
+    private final boolean ignoreLocalModifications;
+    private final boolean updateSettings;
+    private final boolean updateDescendantVersionedFlows;
+    private final boolean updateGroupVersionControlSnapshot;
+    private final boolean updateExistingVariables;
+    private final boolean updateRpgUrls;
+
+    private GroupSynchronizationOptions(final Builder builder) {
+        this.componentIdGenerator = builder.componentIdGenerator;
+        this.componentScheduler = builder.componentScheduler;
+        this.propertyDecryptor = builder.propertyDecryptor;
+        this.ignoreLocalModifications = builder.ignoreLocalModifications;
+        this.updateSettings = builder.updateSettings;
+        this.updateDescendantVersionedFlows = builder.updateDescendantVersionedFlows;
+        this.updateGroupVersionControlSnapshot = builder.updateGroupVersionControlSnapshot;
+        this.updateExistingVariables = builder.updateExistingVariables;
+        this.updateRpgUrls = builder.updateRpgUrls;
+    }
+
+    public ComponentIdGenerator getComponentIdGenerator() {
+        return componentIdGenerator;
+    }
+
+    public ComponentScheduler getComponentScheduler() {
+        return componentScheduler;
+    }
+
+    public boolean isIgnoreLocalModifications() {
+        return ignoreLocalModifications;
+    }
+
+    public boolean isUpdateSettings() {
+        return updateSettings;
+    }
+
+    public boolean isUpdateDescendantVersionedFlows() {
+        return updateDescendantVersionedFlows;
+    }
+
+    public boolean isUpdateGroupVersionControlSnapshot() {
+        return updateGroupVersionControlSnapshot;
+    }
+
+    public boolean isUpdateExistingVariables() {
+        return updateExistingVariables;
+    }
+
+    public boolean isUpdateRpgUrls() {
+        return updateRpgUrls;
+    }
+
+    public PropertyDecryptor getPropertyDecryptor() {
+        return propertyDecryptor;
+    }
+
+
+    public static class Builder {
+        private ComponentIdGenerator componentIdGenerator;
+        private ComponentScheduler componentScheduler;
+        private boolean ignoreLocalModifications = false;
+        private boolean updateSettings = true;
+        private boolean updateDescendantVersionedFlows = true;
+        private boolean updateGroupVersionControlSnapshot = true;
+        private boolean updateExistingVariables = false;
+        private boolean updateRpgUrls = false;
+        private PropertyDecryptor propertyDecryptor = value -> value;
+
+        /**
+         * Specifies the Component ID Generator to use for generating UUID's of components that are to be added to a ProcessGroup
+         * @param componentIdGenerator the ComponentIdGenerator to use
+         * @return the builder
+         */
+        public Builder componentIdGenerator(final ComponentIdGenerator componentIdGenerator) {
+            this.componentIdGenerator = componentIdGenerator;
+            return this;
+        }
+
+        /**
+         * Specifies the ComponentScheduler to use for starting connectable components
+         * @param componentScheduler the ComponentScheduler to use
+         * @return the builder
+         */
+        public Builder componentScheduler(final ComponentScheduler componentScheduler) {
+            this.componentScheduler = componentScheduler;
+            return this;
+        }
+
+        /**
+         * Specifies whether local modifications to a dataflow should prevent the flow from being updated
+         *
+         * @param ignore if <code>true</code>, the Process Group should be synchronized with the proposed VersionedProcessGroup even if it has local modifications.
+         * If <code>false</code>, an attempt to synchronize a Process Group with a proposed flow should fail
+         * @return the builder
+         */
+        public Builder ignoreLocalModifications(final boolean ignore) {
+            this.ignoreLocalModifications = ignore;
+            return this;
+        }
+
+        /**
+         * Specifies whether or not a Process Group's settings (e.g., name, position) should be updated
+         * @param updateSettings whether or not to update the Process Group's settings
+         * @return the builder
+         */
+        public Builder updateGroupSettings(final boolean updateSettings) {
+            this.updateSettings = updateSettings;
+            return this;
+        }
+
+        /**
+         * If a child Process Group is under version control, specifies whether or not the child should have its contents synchronized
+         * @param updateDescendantVersionedFlows <code>true</code> to synchronize child groups, <code>false</code> otherwise
+         * @return the builder
+         */
+        public Builder updateDescendantVersionedFlows(final boolean updateDescendantVersionedFlows) {
+            this.updateDescendantVersionedFlows = updateDescendantVersionedFlows;
+            return this;
+        }
+
+        /**
+         * When a Process Group is version controlled, it tracks whether or not there are any local modifications by comparing the current dataflow
+         * to a snapshot of what the Versioned Flow looks like. If this value is set to <code>true</code>, when the Process Group is synchronized
+         * with a VersionedProcessGroup, that VersionedProcessGroup will become the snapshot of what the Versioned Flow looks like. If <code>false</code>,
+         * the snapshot is not updated.
+         *
+         * @param updateGroupVersionControlSnapshot <code>true</code> to update the snapshot, <code>false</code> otherwise
+         * @return the builder
+         */
+        public Builder updateGroupVersionControlSnapshot(final boolean updateGroupVersionControlSnapshot) {
+            this.updateGroupVersionControlSnapshot = updateGroupVersionControlSnapshot;
+            return this;
+        }
+
+        /**
+         * Specifies whether or not existing variables in the Process Group's Variable Registry should be updated. If <code>false</code>, any Variable
+         * that exists in a Versioned Process Group will be added to the group's variable registry but existing variables will not be modified. If <code>true</code>,
+         * existing variables will also be updated
+         *
+         * @param updateExistingVariables whether or not to update existing variables
+         * @return the builder
+         */
+        public Builder updateExistingVariables(final boolean updateExistingVariables) {
+            this.updateExistingVariables = updateExistingVariables;
+            return this;
+        }
+
+        /**
+         * Specifies whether or not the URLs / "Target URIs" of a Remote Process Group that exists in both the Proposed flow and the current flow

Review comment:
       proposed

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
##########
@@ -198,17 +198,17 @@ public void cleanup() throws Exception {
 
     @Test
     public void testSynchronizeFlowWithReportingTaskAndProcessorReferencingControllerService() throws IOException {
-        final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
+        final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(
                 PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
 
-        // create a mock proposed data flow with the same auth fingerprint as the current authorizer
+        // create a mock proposed data flow with the same auth fingerprint as the current authoriz

Review comment:
       authorizer

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
##########
@@ -0,0 +1,2050 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.groups;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.connectable.Size;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.PropertyConfiguration;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.encrypt.EncryptionException;
+import org.apache.nifi.flow.BatchSize;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedLabel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedPropertyDescriptor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.parameter.Parameter;
+import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.VariableDescriptor;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.StandardVersionControlInformation;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowState;
+import org.apache.nifi.registry.flow.VersionedParameter;
+import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.FlowComparator;
+import org.apache.nifi.registry.flow.diff.FlowComparison;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.apache.nifi.remote.PublicPort;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.FlowDifferenceFilters;
+import org.apache.nifi.web.ResourceNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronizer {
+    private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroupSynchronizer.class);
+    private static final String TEMP_FUNNEL_ID_SUFFIX = "-temp-funnel";
+    public static final String ENC_PREFIX = "enc{";
+    public static final String ENC_SUFFIX = "}";
+
+    private final ProcessGroupSynchronizationContext context;
+    private final Set<String> updatedVersionedComponentIds = new HashSet<>();
+
+    private Set<String> preExistingVariables = new HashSet<>();
+    private GroupSynchronizationOptions syncOptions;
+
+    public StandardProcessGroupSynchronizer(final ProcessGroupSynchronizationContext context) {
+        this.context = context;
+    }
+
+    private void setPreExistingVariables(final Set<String> preExistingVariables) {
+        this.preExistingVariables = preExistingVariables;
+    }
+
+    private void setUpdatedVersionedComponentIds(final Set<String> updatedVersionedComponentIds) {
+        this.updatedVersionedComponentIds.clear();
+        this.updatedVersionedComponentIds.addAll(updatedVersionedComponentIds);
+    }
+
+    public void setSynchronizationOptions(final GroupSynchronizationOptions syncOptions) {
+        this.syncOptions = syncOptions;
+    }
+
+    @Override
+    public void synchronize(final ProcessGroup group, final VersionedFlowSnapshot proposedSnapshot, final GroupSynchronizationOptions options) throws ProcessorInstantiationException {
+
+        final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(context.getExtensionManager(), context.getFlowMappingOptions());
+        final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(group, context.getControllerServiceProvider(), context.getFlowRegistryClient(), true);
+
+        final ComparableDataFlow localFlow = new StandardComparableDataFlow("Currently Loaded Flow", versionedGroup);
+        final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", proposedSnapshot.getFlowContents());
+
+        final PropertyDecryptor decryptor = options.getPropertyDecryptor();
+        final FlowComparator flowComparator = new StandardFlowComparator(proposedFlow, localFlow, group.getAncestorServiceIds(), new StaticDifferenceDescriptor(), decryptor::decrypt);
+        final FlowComparison flowComparison = flowComparator.compare();
+
+        updatedVersionedComponentIds.clear();
+        setSynchronizationOptions(options);
+
+        for (final FlowDifference diff : flowComparison.getDifferences()) {
+            if (FlowDifferenceFilters.isPropertyMissingFromGhostComponent(diff, context.getFlowManager())) {
+                continue;
+            }
+            if (FlowDifferenceFilters.isScheduledStateNew(diff)) {
+                continue;
+            }
+
+            // If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
+            // and if so compare our VersionedControllerService to the existing service.
+            if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
+                final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA();
+                if (ComponentType.CONTROLLER_SERVICE == component.getComponentType()) {
+                    final ControllerServiceNode serviceNode = getVersionedControllerService(group, component.getIdentifier());
+                    if (serviceNode != null) {
+                        final VersionedControllerService versionedService = mapper.mapControllerService(serviceNode, context.getControllerServiceProvider(),
+                            Collections.singleton(serviceNode.getProcessGroupIdentifier()), new HashMap<>());
+                        final Set<FlowDifference> differences = flowComparator.compareControllerServices(versionedService, (VersionedControllerService) component);
+
+                        if (!differences.isEmpty()) {
+                            updatedVersionedComponentIds.add(component.getIdentifier());
+                        }
+
+                        continue;
+                    }
+                }
+            }
+
+            final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA();
+            updatedVersionedComponentIds.add(component.getIdentifier());
+
+            if (component.getComponentType() == ComponentType.REMOTE_INPUT_PORT || component.getComponentType() == ComponentType.REMOTE_OUTPUT_PORT) {
+                final String remoteGroupId = ((VersionedRemoteGroupPort) component).getRemoteGroupId();
+                updatedVersionedComponentIds.add(remoteGroupId);
+            }
+        }
+
+        if (LOG.isInfoEnabled()) {
+            final String differencesByLine = flowComparison.getDifferences().stream()
+                .map(FlowDifference::toString)
+                .collect(Collectors.joining("\n"));
+
+            LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, proposedSnapshot,
+                flowComparison.getDifferences().size(), differencesByLine);
+        }
+
+        final Set<String> knownVariables = getKnownVariableNames(group);
+
+        preExistingVariables.clear();
+
+        // If we don't want to update existing variables, we need to populate the pre-existing variables so that we know which variables already existed.
+        // We can't do this when updating the Variable Registry for a Process Group because variables are inherited, and the variables of the parent group
+        // may already have been updated when we get to the point of updating a child's Variable Registry. As a result, we build up a Set of all known
+        // Variables before we update the Variable Registries.
+        if (!options.isUpdateExistingVariables()) {
+            preExistingVariables.addAll(knownVariables);
+        }
+
+        synchronize(group, proposedSnapshot.getFlowContents(), proposedSnapshot.getParameterContexts());
+        group.onComponentModified();
+    }
+
+    private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts)
+                    throws ProcessorInstantiationException {
+
+        // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we
+        // transition the service into the RUNNING state, and then we need to update a Connection that is connected to it,
+        // updating the Connection will fail because the Connection's source & destination must both be stopped in order to
+        // update it. To avoid that, we simply pause the scheduler. Once all updates have been made, we will resume the scheduler.
+        context.getComponentScheduler().pause();
+
+        group.setComments(proposed.getComments());
+
+        if (syncOptions.isUpdateSettings()) {
+            if (proposed.getName() != null) {
+                group.setName(proposed.getName());
+            }
+
+            if (proposed.getPosition() != null) {
+                group.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
+            }
+        }
+
+        updateParameterContext(group, proposed, versionedParameterContexts, context.getComponentIdGenerator());
+        updateVariableRegistry(group, proposed);
+
+        final FlowFileConcurrency flowFileConcurrency = proposed.getFlowFileConcurrency() == null ? FlowFileConcurrency.UNBOUNDED :
+            FlowFileConcurrency.valueOf(proposed.getFlowFileConcurrency());
+        group.setFlowFileConcurrency(flowFileConcurrency);
+
+        final FlowFileOutboundPolicy outboundPolicy = proposed.getFlowFileOutboundPolicy() == null ? FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE :
+            FlowFileOutboundPolicy.valueOf(proposed.getFlowFileOutboundPolicy());
+        group.setFlowFileOutboundPolicy(outboundPolicy);
+
+        group.setDefaultFlowFileExpiration(proposed.getDefaultFlowFileExpiration());
+        group.setDefaultBackPressureObjectThreshold(proposed.getDefaultBackPressureObjectThreshold());
+        group.setDefaultBackPressureDataSizeThreshold(proposed.getDefaultBackPressureDataSizeThreshold());
+
+        final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
+        if (remoteCoordinates == null) {
+            group.disconnectVersionControl(false);
+        } else {
+            final String registryId = context.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl());
+            final String bucketId = remoteCoordinates.getBucketId();
+            final String flowId = remoteCoordinates.getFlowId();
+            final int version = remoteCoordinates.getVersion();
+
+            final FlowRegistry flowRegistry = context.getFlowRegistryClient().getFlowRegistry(registryId);
+            final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();
+
+            final VersionedFlowState flowState;
+            if (remoteCoordinates.getLatest() == null) {
+                flowState = VersionedFlowState.SYNC_FAILURE;
+            } else {
+                flowState = remoteCoordinates.getLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
+            }
+
+            final VersionControlInformation vci = new StandardVersionControlInformation.Builder()
+                .registryId(registryId)
+                .registryName(registryName)
+                .bucketId(bucketId)
+                .bucketName(bucketId)
+                .flowId(flowId)
+                .flowName(flowId)
+                .version(version)
+                .flowSnapshot(syncOptions.isUpdateGroupVersionControlSnapshot() ? proposed : null)
+                .status(new StandardVersionedFlowStatus(flowState, flowState.getDescription()))
+                .build();
+
+            group.setVersionControlInformation(vci, Collections.emptyMap());
+        }
+
+        // In order to properly update all of the components, we have to follow a specific order of operations, in order to ensure that
+        // we don't try to perform illegal operations like removing a Processor that has an incoming connection (which would throw an
+        // IllegalStateException and fail).
+        //
+        // The sequence of steps / order of operations are as follows:
+        //
+        // 1. Remove any Controller Services that do not exist in the proposed group
+        // 2. Add any Controller Services that are in the proposed group that are not in the current flow
+        // 3. Update Controller Services to match those in the proposed group
+        // 4. Remove any connections that do not exist in the proposed group
+        // 5. For any connection that does exist, if the proposed group has a different destination for the connection, update the destination.
+        //    If the new destination does not yet exist in the flow, set the destination as some temporary component.
+        // 6. Remove any other components that do not exist in the proposed group.
+        // 7. Add any components, other than Connections, that exist in the proposed group but not in the current flow
+        // 8. Update components, other than Connections, to match those in the proposed group
+        // 9. Add connections that exist in the proposed group that are not in the current flow
+        // 10. Update connections to match those in the proposed group
+        // 11. Delete the temporary destination that was created above
+
+        // Keep track of any processors that have been updated to have auto-terminated relationships so that we can set those
+        // auto-terminated relationships after we've handled creating/deleting necessary connections.
+        final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships = new HashMap<>();
+
+        // During the flow update, we will use temporary names for process group ports. This is because port names must be
+        // unique within a process group, but during an update we might temporarily be in a state where two ports have the same name.
+        // For example, if a process group update involves removing/renaming port A, and then adding/updating port B where B is given
+        // A's former name. This is a valid state by the end of the flow update, but for a brief moment there may be two ports with the
+        // same name. To avoid this conflict, we keep the final names in a map indexed by port id, use a temporary name for each port
+        // during the update, and after all ports have been added/updated/removed, we set the final names on all ports.
+        final Map<Port, String> proposedPortFinalNames = new HashMap<>();
+
+        // Controller Services
+        final Map<String, ControllerServiceNode> controllerServicesByVersionedId = componentsById(group, grp -> grp.getControllerServices(false),
+            ControllerServiceNode::getIdentifier, ControllerServiceNode::getVersionedComponentId);
+        removeMissingControllerServices(group, proposed, controllerServicesByVersionedId);
+        synchronizeControllerServices(group, proposed, controllerServicesByVersionedId);
+
+        // Remove any connections that are not in the Proposed Process Group
+        // Connections must be the first thing to remove, not the last. Otherwise, we will fail
+        // to remove a component if it has a connection going to it!
+        final Map<String, Connection> connectionsByVersionedId = componentsById(group, ProcessGroup::getConnections, Connection::getIdentifier, Connection::getVersionedComponentId);
+        removeMissingConnections(group, proposed, connectionsByVersionedId);
+
+        // Before we remove other components, we have to ensure that the Connections have the appropriate destinations. Otherwise, we could have a situation
+        // where Connection A used to have a destination of B but now has a destination of C, which doesn't exist yet. And B doesn't exist in the new flow.
+        // This is a problem because we cannot remove B, since it has an incoming Connection. And we can't change the destination to C because C hasn't been
+        // added yet. As a result, we need a temporary location to set as the Connection's destination. So we create a Funnel for this and then we can update
+        // all Connections to have the appropriate destinations.
+        final Set<String> connectionsWithTempDestination = updateConnectionDestinations(group, proposed, connectionsByVersionedId);
+
+        try {
+            final Map<String, Funnel> funnelsByVersionedId = componentsById(group, ProcessGroup::getFunnels);
+            final Map<String, ProcessorNode> processorsByVersionedId = componentsById(group, ProcessGroup::getProcessors);
+            final Map<String, Port> inputPortsByVersionedId = componentsById(group, ProcessGroup::getInputPorts);
+            final Map<String, Port> outputPortsByVersionedId = componentsById(group, ProcessGroup::getOutputPorts);
+            final Map<String, Label> labelsByVersionedId = componentsById(group, ProcessGroup::getLabels, Label::getIdentifier, Label::getVersionedComponentId);
+            final Map<String, RemoteProcessGroup> rpgsByVersionedId = componentsById(group, ProcessGroup::getRemoteProcessGroups,
+                RemoteProcessGroup::getIdentifier, RemoteProcessGroup::getVersionedComponentId);
+            final Map<String, ProcessGroup> childGroupsByVersionedId = componentsById(group, ProcessGroup::getProcessGroups, ProcessGroup::getIdentifier, ProcessGroup::getVersionedComponentId);
+
+            removeMissingProcessors(group, proposed, processorsByVersionedId);
+            removeMissingFunnels(group, proposed, funnelsByVersionedId);
+            removeMissingInputPorts(group, proposed, inputPortsByVersionedId);
+            removeMissingOutputPorts(group, proposed, outputPortsByVersionedId);
+            removeMissingLabels(group, proposed, labelsByVersionedId);
+            removeMissingRpg(group, proposed, rpgsByVersionedId);
+            removeMissingChildGroups(group, proposed, childGroupsByVersionedId);
+
+            // Synchronize Child Process Groups
+            synchronizeChildGroups(group, proposed, versionedParameterContexts, childGroupsByVersionedId);
+
+            synchronizeFunnels(group, proposed, funnelsByVersionedId);
+            synchronizeInputPorts(group, proposed, proposedPortFinalNames, inputPortsByVersionedId);
+            synchronizeOutputPorts(group, proposed, proposedPortFinalNames, outputPortsByVersionedId);
+            synchronizeLabels(group, proposed, labelsByVersionedId);
+            synchronizeProcessors(group, proposed, autoTerminatedRelationships, processorsByVersionedId);
+            synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
+        } finally {
+            // Make sure that we reset the connections
+            restoreConnectionDestinations(group, proposed, connectionsByVersionedId, connectionsWithTempDestination);
+            removeTemporaryFunnel(group);
+        }
+
+        // We can now add in any necessary connections, since all connectable components have now been created.
+        synchronizeConnections(group, proposed, connectionsByVersionedId);
+
+        // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
+        // We cannot do this above, in the 'updateProcessor' call because if a connection is removed and changed to auto-terminated,
+        // then updating this in the updateProcessor call above would attempt to set the Relationship to being auto-terminated while a
+        // Connection for that relationship exists. This will throw an Exception.
+        autoTerminatedRelationships.forEach(ProcessorNode::setAutoTerminatedRelationships);
+
+        // All ports have now been added/removed as necessary. We can now resolve the port names.
+        updatePortsToFinalNames(proposedPortFinalNames);
+
+        // Start all components that are queued up to be started now
+        context.getComponentScheduler().resume();
+    }
+
+    private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
+                                        final Map<String, ProcessGroup> childGroupsByVersionedId) throws ProcessorInstantiationException {
+
+        for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
+            final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
+            final VersionedFlowCoordinates childCoordinates = proposedChildGroup.getVersionedFlowCoordinates();
+
+            // if there is a nested process group that is versioned controlled, make sure get the param contexts that go with that snapshot

Review comment:
       version controlled

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
##########
@@ -0,0 +1,2050 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.groups;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.connectable.Size;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.PropertyConfiguration;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.encrypt.EncryptionException;
+import org.apache.nifi.flow.BatchSize;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedLabel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedPropertyDescriptor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.parameter.Parameter;
+import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.VariableDescriptor;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.StandardVersionControlInformation;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowState;
+import org.apache.nifi.registry.flow.VersionedParameter;
+import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.FlowComparator;
+import org.apache.nifi.registry.flow.diff.FlowComparison;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.apache.nifi.remote.PublicPort;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.FlowDifferenceFilters;
+import org.apache.nifi.web.ResourceNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronizer {
+    private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroupSynchronizer.class);
+    private static final String TEMP_FUNNEL_ID_SUFFIX = "-temp-funnel";
+    public static final String ENC_PREFIX = "enc{";
+    public static final String ENC_SUFFIX = "}";
+
+    private final ProcessGroupSynchronizationContext context;
+    private final Set<String> updatedVersionedComponentIds = new HashSet<>();
+
+    private Set<String> preExistingVariables = new HashSet<>();
+    private GroupSynchronizationOptions syncOptions;
+
+    public StandardProcessGroupSynchronizer(final ProcessGroupSynchronizationContext context) {
+        this.context = context;
+    }
+
+    private void setPreExistingVariables(final Set<String> preExistingVariables) {
+        this.preExistingVariables = preExistingVariables;
+    }
+
+    private void setUpdatedVersionedComponentIds(final Set<String> updatedVersionedComponentIds) {
+        this.updatedVersionedComponentIds.clear();
+        this.updatedVersionedComponentIds.addAll(updatedVersionedComponentIds);
+    }
+
+    public void setSynchronizationOptions(final GroupSynchronizationOptions syncOptions) {
+        this.syncOptions = syncOptions;
+    }
+
+    @Override
+    public void synchronize(final ProcessGroup group, final VersionedFlowSnapshot proposedSnapshot, final GroupSynchronizationOptions options) throws ProcessorInstantiationException {
+
+        final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(context.getExtensionManager(), context.getFlowMappingOptions());
+        final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(group, context.getControllerServiceProvider(), context.getFlowRegistryClient(), true);
+
+        final ComparableDataFlow localFlow = new StandardComparableDataFlow("Currently Loaded Flow", versionedGroup);
+        final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", proposedSnapshot.getFlowContents());
+
+        final PropertyDecryptor decryptor = options.getPropertyDecryptor();
+        final FlowComparator flowComparator = new StandardFlowComparator(proposedFlow, localFlow, group.getAncestorServiceIds(), new StaticDifferenceDescriptor(), decryptor::decrypt);
+        final FlowComparison flowComparison = flowComparator.compare();
+
+        updatedVersionedComponentIds.clear();
+        setSynchronizationOptions(options);
+
+        for (final FlowDifference diff : flowComparison.getDifferences()) {
+            if (FlowDifferenceFilters.isPropertyMissingFromGhostComponent(diff, context.getFlowManager())) {
+                continue;
+            }
+            if (FlowDifferenceFilters.isScheduledStateNew(diff)) {
+                continue;
+            }
+
+            // If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
+            // and if so compare our VersionedControllerService to the existing service.
+            if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
+                final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA();
+                if (ComponentType.CONTROLLER_SERVICE == component.getComponentType()) {
+                    final ControllerServiceNode serviceNode = getVersionedControllerService(group, component.getIdentifier());
+                    if (serviceNode != null) {
+                        final VersionedControllerService versionedService = mapper.mapControllerService(serviceNode, context.getControllerServiceProvider(),
+                            Collections.singleton(serviceNode.getProcessGroupIdentifier()), new HashMap<>());
+                        final Set<FlowDifference> differences = flowComparator.compareControllerServices(versionedService, (VersionedControllerService) component);
+
+                        if (!differences.isEmpty()) {
+                            updatedVersionedComponentIds.add(component.getIdentifier());
+                        }
+
+                        continue;
+                    }
+                }
+            }
+
+            final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA();
+            updatedVersionedComponentIds.add(component.getIdentifier());
+
+            if (component.getComponentType() == ComponentType.REMOTE_INPUT_PORT || component.getComponentType() == ComponentType.REMOTE_OUTPUT_PORT) {
+                final String remoteGroupId = ((VersionedRemoteGroupPort) component).getRemoteGroupId();
+                updatedVersionedComponentIds.add(remoteGroupId);
+            }
+        }
+
+        if (LOG.isInfoEnabled()) {
+            final String differencesByLine = flowComparison.getDifferences().stream()
+                .map(FlowDifference::toString)
+                .collect(Collectors.joining("\n"));
+
+            LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, proposedSnapshot,
+                flowComparison.getDifferences().size(), differencesByLine);
+        }
+
+        final Set<String> knownVariables = getKnownVariableNames(group);
+
+        preExistingVariables.clear();
+
+        // If we don't want to update existing variables, we need to populate the pre-existing variables so that we know which variables already existed.
+        // We can't do this when updating the Variable Registry for a Process Group because variables are inherited, and the variables of the parent group
+        // may already have been updated when we get to the point of updating a child's Variable Registry. As a result, we build up a Set of all known
+        // Variables before we update the Variable Registries.
+        if (!options.isUpdateExistingVariables()) {
+            preExistingVariables.addAll(knownVariables);
+        }
+
+        synchronize(group, proposedSnapshot.getFlowContents(), proposedSnapshot.getParameterContexts());
+        group.onComponentModified();
+    }
+
+    private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts)
+                    throws ProcessorInstantiationException {
+
+        // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we
+        // transition the service into the RUNNING state, and then we need to update a Connection that is connected to it,
+        // updating the Connection will fail because the Connection's source & destination must both be stopped in order to
+        // update it. To avoid that, we simply pause the scheduler. Once all updates have been made, we will resume the scheduler.
+        context.getComponentScheduler().pause();
+
+        group.setComments(proposed.getComments());
+
+        if (syncOptions.isUpdateSettings()) {
+            if (proposed.getName() != null) {
+                group.setName(proposed.getName());
+            }
+
+            if (proposed.getPosition() != null) {
+                group.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
+            }
+        }
+
+        updateParameterContext(group, proposed, versionedParameterContexts, context.getComponentIdGenerator());
+        updateVariableRegistry(group, proposed);
+
+        final FlowFileConcurrency flowFileConcurrency = proposed.getFlowFileConcurrency() == null ? FlowFileConcurrency.UNBOUNDED :
+            FlowFileConcurrency.valueOf(proposed.getFlowFileConcurrency());
+        group.setFlowFileConcurrency(flowFileConcurrency);
+
+        final FlowFileOutboundPolicy outboundPolicy = proposed.getFlowFileOutboundPolicy() == null ? FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE :
+            FlowFileOutboundPolicy.valueOf(proposed.getFlowFileOutboundPolicy());
+        group.setFlowFileOutboundPolicy(outboundPolicy);
+
+        group.setDefaultFlowFileExpiration(proposed.getDefaultFlowFileExpiration());
+        group.setDefaultBackPressureObjectThreshold(proposed.getDefaultBackPressureObjectThreshold());
+        group.setDefaultBackPressureDataSizeThreshold(proposed.getDefaultBackPressureDataSizeThreshold());
+
+        final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
+        if (remoteCoordinates == null) {
+            group.disconnectVersionControl(false);
+        } else {
+            final String registryId = context.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl());
+            final String bucketId = remoteCoordinates.getBucketId();
+            final String flowId = remoteCoordinates.getFlowId();
+            final int version = remoteCoordinates.getVersion();
+
+            final FlowRegistry flowRegistry = context.getFlowRegistryClient().getFlowRegistry(registryId);
+            final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();
+
+            final VersionedFlowState flowState;
+            if (remoteCoordinates.getLatest() == null) {
+                flowState = VersionedFlowState.SYNC_FAILURE;
+            } else {
+                flowState = remoteCoordinates.getLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
+            }
+
+            final VersionControlInformation vci = new StandardVersionControlInformation.Builder()
+                .registryId(registryId)
+                .registryName(registryName)
+                .bucketId(bucketId)
+                .bucketName(bucketId)
+                .flowId(flowId)
+                .flowName(flowId)
+                .version(version)
+                .flowSnapshot(syncOptions.isUpdateGroupVersionControlSnapshot() ? proposed : null)
+                .status(new StandardVersionedFlowStatus(flowState, flowState.getDescription()))
+                .build();
+
+            group.setVersionControlInformation(vci, Collections.emptyMap());
+        }
+
+        // In order to properly update all of the components, we have to follow a specific order of operations, in order to ensure that
+        // we don't try to perform illegal operations like removing a Processor that has an incoming connection (which would throw an
+        // IllegalStateException and fail).
+        //
+        // The sequence of steps / order of operations are as follows:
+        //
+        // 1. Remove any Controller Services that do not exist in the proposed group
+        // 2. Add any Controller Services that are in the proposed group that are not in the current flow
+        // 3. Update Controller Services to match those in the proposed group
+        // 4. Remove any connections that do not exist in the proposed group
+        // 5. For any connection that does exist, if the proposed group has a different destination for the connection, update the destination.
+        //    If the new destination does not yet exist in the flow, set the destination as some temporary component.
+        // 6. Remove any other components that do not exist in the proposed group.
+        // 7. Add any components, other than Connections, that exist in the proposed group but not in the current flow
+        // 8. Update components, other than Connections, to match those in the proposed group
+        // 9. Add connections that exist in the proposed group that are not in the current flow
+        // 10. Update connections to match those in the proposed group
+        // 11. Delete the temporary destination that was created above
+
+        // Keep track of any processors that have been updated to have auto-terminated relationships so that we can set those
+        // auto-terminated relationships after we've handled creating/deleting necessary connections.
+        final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships = new HashMap<>();
+
+        // During the flow update, we will use temporary names for process group ports. This is because port names must be
+        // unique within a process group, but during an update we might temporarily be in a state where two ports have the same name.
+        // For example, if a process group update involves removing/renaming port A, and then adding/updating port B where B is given
+        // A's former name. This is a valid state by the end of the flow update, but for a brief moment there may be two ports with the
+        // same name. To avoid this conflict, we keep the final names in a map indexed by port id, use a temporary name for each port
+        // during the update, and after all ports have been added/updated/removed, we set the final names on all ports.
+        final Map<Port, String> proposedPortFinalNames = new HashMap<>();
+
+        // Controller Services
+        final Map<String, ControllerServiceNode> controllerServicesByVersionedId = componentsById(group, grp -> grp.getControllerServices(false),
+            ControllerServiceNode::getIdentifier, ControllerServiceNode::getVersionedComponentId);
+        removeMissingControllerServices(group, proposed, controllerServicesByVersionedId);
+        synchronizeControllerServices(group, proposed, controllerServicesByVersionedId);
+
+        // Remove any connections that are not in the Proposed Process Group
+        // Connections must be the first thing to remove, not the last. Otherwise, we will fail
+        // to remove a component if it has a connection going to it!
+        final Map<String, Connection> connectionsByVersionedId = componentsById(group, ProcessGroup::getConnections, Connection::getIdentifier, Connection::getVersionedComponentId);
+        removeMissingConnections(group, proposed, connectionsByVersionedId);
+
+        // Before we remove other components, we have to ensure that the Connections have the appropriate destinations. Otherwise, we could have a situation
+        // where Connection A used to have a destination of B but now has a destination of C, which doesn't exist yet. And B doesn't exist in the new flow.
+        // This is a problem because we cannot remove B, since it has an incoming Connection. And we can't change the destination to C because C hasn't been
+        // added yet. As a result, we need a temporary location to set as the Connection's destination. So we create a Funnel for this and then we can update
+        // all Connections to have the appropriate destinations.
+        final Set<String> connectionsWithTempDestination = updateConnectionDestinations(group, proposed, connectionsByVersionedId);
+
+        try {
+            final Map<String, Funnel> funnelsByVersionedId = componentsById(group, ProcessGroup::getFunnels);
+            final Map<String, ProcessorNode> processorsByVersionedId = componentsById(group, ProcessGroup::getProcessors);
+            final Map<String, Port> inputPortsByVersionedId = componentsById(group, ProcessGroup::getInputPorts);
+            final Map<String, Port> outputPortsByVersionedId = componentsById(group, ProcessGroup::getOutputPorts);
+            final Map<String, Label> labelsByVersionedId = componentsById(group, ProcessGroup::getLabels, Label::getIdentifier, Label::getVersionedComponentId);
+            final Map<String, RemoteProcessGroup> rpgsByVersionedId = componentsById(group, ProcessGroup::getRemoteProcessGroups,
+                RemoteProcessGroup::getIdentifier, RemoteProcessGroup::getVersionedComponentId);
+            final Map<String, ProcessGroup> childGroupsByVersionedId = componentsById(group, ProcessGroup::getProcessGroups, ProcessGroup::getIdentifier, ProcessGroup::getVersionedComponentId);
+
+            removeMissingProcessors(group, proposed, processorsByVersionedId);
+            removeMissingFunnels(group, proposed, funnelsByVersionedId);
+            removeMissingInputPorts(group, proposed, inputPortsByVersionedId);
+            removeMissingOutputPorts(group, proposed, outputPortsByVersionedId);
+            removeMissingLabels(group, proposed, labelsByVersionedId);
+            removeMissingRpg(group, proposed, rpgsByVersionedId);
+            removeMissingChildGroups(group, proposed, childGroupsByVersionedId);
+
+            // Synchronize Child Process Groups
+            synchronizeChildGroups(group, proposed, versionedParameterContexts, childGroupsByVersionedId);
+
+            synchronizeFunnels(group, proposed, funnelsByVersionedId);
+            synchronizeInputPorts(group, proposed, proposedPortFinalNames, inputPortsByVersionedId);
+            synchronizeOutputPorts(group, proposed, proposedPortFinalNames, outputPortsByVersionedId);
+            synchronizeLabels(group, proposed, labelsByVersionedId);
+            synchronizeProcessors(group, proposed, autoTerminatedRelationships, processorsByVersionedId);
+            synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
+        } finally {
+            // Make sure that we reset the connections
+            restoreConnectionDestinations(group, proposed, connectionsByVersionedId, connectionsWithTempDestination);
+            removeTemporaryFunnel(group);
+        }
+
+        // We can now add in any necessary connections, since all connectable components have now been created.
+        synchronizeConnections(group, proposed, connectionsByVersionedId);
+
+        // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
+        // We cannot do this above, in the 'updateProcessor' call because if a connection is removed and changed to auto-terminated,
+        // then updating this in the updateProcessor call above would attempt to set the Relationship to being auto-terminated while a
+        // Connection for that relationship exists. This will throw an Exception.
+        autoTerminatedRelationships.forEach(ProcessorNode::setAutoTerminatedRelationships);
+
+        // All ports have now been added/removed as necessary. We can now resolve the port names.
+        updatePortsToFinalNames(proposedPortFinalNames);
+
+        // Start all components that are queued up to be started now
+        context.getComponentScheduler().resume();
+    }
+
+    private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
+                                        final Map<String, ProcessGroup> childGroupsByVersionedId) throws ProcessorInstantiationException {
+
+        for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
+            final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
+            final VersionedFlowCoordinates childCoordinates = proposedChildGroup.getVersionedFlowCoordinates();
+
+            // if there is a nested process group that is versioned controlled, make sure get the param contexts that go with that snapshot
+            // instead of the ones from the parent which would have been passed in to this method
+            Map<String, VersionedParameterContext> childParameterContexts = versionedParameterContexts;
+            if (childCoordinates != null && syncOptions.isUpdateDescendantVersionedFlows()) {
+                final String childParameterContextName = proposedChildGroup.getParameterContextName();
+                if (childParameterContextName != null && !versionedParameterContexts.containsKey(childParameterContextName)) {
+                    childParameterContexts = getVersionedParameterContexts(childCoordinates);
+                } else {
+                    childParameterContexts = versionedParameterContexts;
+                }
+            }
+
+            if (childGroup == null) {
+                final ProcessGroup added = addProcessGroup(group, proposedChildGroup, context.getComponentIdGenerator(), preExistingVariables, childParameterContexts);
+                context.getFlowManager().onProcessGroupAdded(added);
+                added.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
+                LOG.info("Added {} to {}", added, group);
+            } else if (childCoordinates == null || syncOptions.isUpdateDescendantVersionedFlows()) {
+
+                final StandardProcessGroupSynchronizer sync = new StandardProcessGroupSynchronizer(context);
+                sync.setPreExistingVariables(preExistingVariables);
+                sync.setUpdatedVersionedComponentIds(updatedVersionedComponentIds);
+                final GroupSynchronizationOptions options = GroupSynchronizationOptions.Builder.from(syncOptions)
+                    .updateGroupSettings(true)
+                    .build();
+
+                sync.setSynchronizationOptions(options);
+                sync.synchronize(childGroup, proposedChildGroup, childParameterContexts);
+
+                LOG.info("Updated {}", childGroup);
+            }
+        }
+    }
+
+    private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ControllerServiceNode> servicesByVersionedId) {
+        // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller
+        // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding
+        // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each
+        // Controller Service. This way, we ensure that all services have been created before setting the properties. This allows us to
+        // properly obtain the correct mapping of Controller Service VersionedComponentID to Controller Service instance id.
+        final Map<ControllerServiceNode, VersionedControllerService> services = new HashMap<>();
+
+        // Add any Controller Service that does not yet exist.
+        final Map<String, ControllerServiceNode> servicesAdded = new HashMap<>();
+        for (final VersionedControllerService proposedService : proposed.getControllerServices()) {
+            ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier());
+            if (service == null) {
+                service = addControllerService(group, proposedService.getIdentifier(), proposedService.getInstanceIdentifier(),
+                    proposedService.getType(), proposedService.getBundle(), context.getComponentIdGenerator());
+
+                LOG.info("Added {} to {}", service, group);
+                servicesAdded.put(proposedService.getIdentifier(), service);
+            }
+
+            services.put(service, proposedService);
+        }
+
+        // Because we don't know what order to instantiate the Controller Services, it's possible that we have two services such that Service A references Service B.
+        // If Service A happens to get created before Service B, the identifiers won't get matched up. As a result, we now iterate over all created Controller Services
+        // and update them again now that all Controller Services have been created at this level, so that the linkage can now be properly established.
+        for (final VersionedControllerService proposedService : proposed.getControllerServices()) {
+            final ControllerServiceNode addedService = servicesAdded.get(proposedService.getIdentifier());
+            if (addedService == null) {
+                continue;
+            }
+
+            updateControllerService(addedService, proposedService);
+        }
+
+        // Update all of the Controller Services to match the VersionedControllerService
+        for (final Map.Entry<ControllerServiceNode, VersionedControllerService> entry : services.entrySet()) {
+            final ControllerServiceNode service = entry.getKey();
+            final VersionedControllerService proposedService = entry.getValue();
+
+            if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
+                updateControllerService(service, proposedService);
+                LOG.info("Updated {}", service);
+            }
+        }
+
+        // Determine all Controller Services whose scheduled state indicate they should be enabled.
+        final Set<ControllerServiceNode> toEnable = new HashSet<>();
+        for (final Map.Entry<ControllerServiceNode, VersionedControllerService> entry : services.entrySet()) {
+            if (entry.getValue().getScheduledState() == org.apache.nifi.flow.ScheduledState.ENABLED) {
+                toEnable.add(entry.getKey());
+            }
+        }
+
+        // Perform Validation so we can enable controller services and then enable them
+        toEnable.forEach(ComponentNode::performValidation);
+
+        // Enable the services. We have to do this at the end, after creating all of them, in case one service depends on another and
+        // therefore is not valid until all have been created.
+        toEnable.forEach(service -> {
+            if (service.getState() == ControllerServiceState.DISABLED) {
+                LOG.debug("Enabling {}", service);
+                context.getControllerServiceProvider().enableControllerServicesAsync(Collections.singleton(service));
+            }
+        });
+    }
+
+    private void removeMissingConnections(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Connection> connectionsByVersionedId) {
+        final Set<String> connectionsRemoved = new HashSet<>(connectionsByVersionedId.keySet());
+
+        for (final VersionedConnection proposedConnection : proposed.getConnections()) {
+            connectionsRemoved.remove(proposedConnection.getIdentifier());
+        }
+
+        for (final String removedVersionedId : connectionsRemoved) {
+            final Connection connection = connectionsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", connection, group);
+            group.removeConnection(connection);
+            context.getFlowManager().onConnectionRemoved(connection);
+        }
+    }
+
+    private void synchronizeConnections(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Connection> connectionsByVersionedId) {
+        // Add and update Connections
+        for (final VersionedConnection proposedConnection : proposed.getConnections()) {
+            final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier());
+            if (connection == null) {
+                final Connection added = addConnection(group, proposedConnection, context.getComponentIdGenerator());
+                context.getFlowManager().onConnectionAdded(added);
+                LOG.info("Added {} to {}", added, group);
+            } else if (isUpdateable(connection)) {
+                // If the connection needs to be updated, then the source and destination will already have
+                // been stopped (else, the validation above would fail). So if the source or the destination is running,
+                // then we know that we don't need to update the connection.
+                updateConnection(connection, proposedConnection);
+                LOG.info("Updated {}", connection);
+            }
+        }
+    }
+
+    private Set<String> updateConnectionDestinations(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Connection> connectionsByVersionedId) {
+
+        final Set<String> connectionsWithTempDestination = new HashSet<>();
+        for (final VersionedConnection proposedConnection : proposed.getConnections()) {
+            final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier());
+            if (connection == null) {
+                continue;
+            }
+
+            // If the Connection's destination didn't change, nothing to do
+            final String destinationVersionId = connection.getDestination().getVersionedComponentId().orElse(null);
+            final String proposedDestinationId = proposedConnection.getDestination().getId();
+            if (Objects.equals(destinationVersionId, proposedDestinationId)) {
+                continue;
+            }
+
+            // Find the destination of the connection. If the destination doesn't yet exist (because it's part of the proposed Process Group but not yet added),
+            // we will set the destination to a temporary destination. Then, after adding components, we will update the destinations again.
+            Connectable newDestination = getConnectable(group, proposedConnection.getDestination());
+            if (newDestination == null) {
+                final Funnel temporaryDestination = getTemporaryFunnel(connection.getProcessGroup());
+                LOG.debug("Updated Connection {} to have a temporary destination of {}", connection, temporaryDestination);
+                newDestination = temporaryDestination;
+                connectionsWithTempDestination.add(proposedConnection.getIdentifier());
+            }
+
+            connection.setDestination(newDestination);
+        }
+
+        return connectionsWithTempDestination;
+    }
+
+    private Funnel getTemporaryFunnel(final ProcessGroup group) {
+        final String tempFunnelId = group.getIdentifier() + TEMP_FUNNEL_ID_SUFFIX;
+        Funnel temporaryFunnel = context.getFlowManager().getFunnel(tempFunnelId);
+        if (temporaryFunnel == null) {
+            temporaryFunnel = context.getFlowManager().createFunnel(tempFunnelId);
+            temporaryFunnel.setPosition(new Position(0, 0));
+            group.addFunnel(temporaryFunnel, false);
+        }
+
+        return temporaryFunnel;
+    }
+
+    private void restoreConnectionDestinations(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Connection> connectionsByVersionedId,
+                                               final Set<String> connectionsWithTempDestination) {
+        if (connectionsWithTempDestination.isEmpty()) {
+            LOG.debug("No connections with temporary destinations for {}", group);
+            return;
+        }
+
+        final Map<String, VersionedConnection> versionedConnectionsById = proposed.getConnections().stream()
+            .collect(Collectors.toMap(VersionedConnection::getIdentifier, Function.identity()));
+
+        for (final String connectionId : connectionsWithTempDestination) {
+            final Connection connection = connectionsByVersionedId.get(connectionId);
+            final VersionedConnection versionedConnection = versionedConnectionsById.get(connectionId);
+
+            final Connectable newDestination = getConnectable(group, versionedConnection.getDestination());
+            if (newDestination != null) {
+                LOG.debug("Updated Connection {} from its temporary destination to its correct destination of {}", connection, newDestination);
+                connection.setDestination(newDestination);
+            }
+        }
+    }
+
+    private void removeTemporaryFunnel(final ProcessGroup group) {
+        final String tempFunnelId = group.getIdentifier() + TEMP_FUNNEL_ID_SUFFIX;
+        final Funnel temporaryFunnel = context.getFlowManager().getFunnel(tempFunnelId);
+        if (temporaryFunnel == null) {
+            LOG.debug("No temporary funnel to remove for {}", group);
+            return;
+        }
+
+        if (temporaryFunnel.getIncomingConnections().isEmpty()) {
+            LOG.debug("Updated all temporary connections for {}. Removing Temporary funnel from flow", group);
+            group.removeFunnel(temporaryFunnel);
+        } else {
+            LOG.warn("The temporary funnel {} for {} still has {} connections. It cannot be removed.", temporaryFunnel, group, temporaryFunnel.getIncomingConnections().size());
+        }
+    }
+
+    private <T extends Connectable> Map<String, T> componentsById(final ProcessGroup group, final Function<ProcessGroup, Collection<T>> retrieveComponents) {
+        return retrieveComponents.apply(group).stream()
+            .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
+                NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
+    }
+
+
+    private <T> Map<String, T> componentsById(final ProcessGroup group, final Function<ProcessGroup, Collection<T>> retrieveComponents,
+                                              final Function<T, String> retrieveId, final Function<T, Optional<String>> retrieveVersionedComponentId) {
+
+        return retrieveComponents.apply(group).stream()
+            .collect(Collectors.toMap(component -> retrieveVersionedComponentId.apply(component).orElse(
+                NiFiRegistryFlowMapper.generateVersionedComponentId(retrieveId.apply(component))), Function.identity()));
+    }
+
+
+    private void synchronizeFunnels(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Funnel> funnelsByVersionedId) {
+        for (final VersionedFunnel proposedFunnel : proposed.getFunnels()) {
+            final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier());
+            if (funnel == null) {
+                final Funnel added = addFunnel(group, proposedFunnel, context.getComponentIdGenerator());
+                context.getFlowManager().onFunnelAdded(added);
+                LOG.info("Added {} to {}", added, group);
+            } else if (updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) {
+                updateFunnel(funnel, proposedFunnel);
+                LOG.info("Updated {}", funnel);
+            } else {
+                funnel.setPosition(new Position(proposedFunnel.getPosition().getX(), proposedFunnel.getPosition().getY()));
+            }
+        }
+    }
+
+    private void synchronizeInputPorts(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<Port, String> proposedPortFinalNames,
+                                                              final Map<String, Port> inputPortsByVersionedId) {
+        for (final VersionedPort proposedPort : proposed.getInputPorts()) {
+            final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier());
+            if (port == null) {
+                final String temporaryName = generateTemporaryPortName(proposedPort);
+                final Port added = addInputPort(group, proposedPort, context.getComponentIdGenerator(), temporaryName);
+                proposedPortFinalNames.put(added, proposedPort.getName());
+                context.getFlowManager().onInputPortAdded(added);
+                LOG.info("Added {} to {}", added, group);
+            } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
+                final String temporaryName = generateTemporaryPortName(proposedPort);
+                proposedPortFinalNames.put(port, proposedPort.getName());
+                updatePort(port, proposedPort, temporaryName);
+                LOG.info("Updated {}", port);
+            } else {
+                port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
+            }
+        }
+    }
+
+    private void synchronizeOutputPorts(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<Port, String> proposedPortFinalNames,
+                                        final Map<String, Port> outputPortsByVersionedId) {
+
+        for (final VersionedPort proposedPort : proposed.getOutputPorts()) {
+            final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier());
+            if (port == null) {
+                final String temporaryName = generateTemporaryPortName(proposedPort);
+                final Port added = addOutputPort(group, proposedPort, context.getComponentIdGenerator(), temporaryName);
+                proposedPortFinalNames.put(added, proposedPort.getName());
+                context.getFlowManager().onOutputPortAdded(added);
+                LOG.info("Added {} to {}", added, group);
+            } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
+                final String temporaryName = generateTemporaryPortName(proposedPort);
+                proposedPortFinalNames.put(port, proposedPort.getName());
+                updatePort(port, proposedPort, temporaryName);
+                LOG.info("Updated {}", port);
+            } else {
+                port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
+            }
+        }
+    }
+
+    private void updatePortsToFinalNames(final Map<Port, String> proposedPortFinalNames) {
+        // Now that all input/output ports have been removed, we should be able to update
+        // all ports to the final name that was proposed in the new flow version.
+        for (final Map.Entry<Port, String> portAndFinalName : proposedPortFinalNames.entrySet()) {
+            final Port port = portAndFinalName.getKey();
+            final String finalName = portAndFinalName.getValue();
+            LOG.info("Updating {} to replace temporary name with final name", port);
+
+            // For public ports we need to consider if another public port exists somewhere else in the flow with the
+            // same name, and if so then rename the incoming port so the flow can still be imported
+            if (port instanceof PublicPort) {
+                final PublicPort publicPort = (PublicPort) port;
+                final String publicPortFinalName = getPublicPortFinalName(publicPort, finalName);
+                updatePortToSetFinalName(publicPort, publicPortFinalName);
+            } else {
+                updatePortToSetFinalName(port, finalName);
+            }
+        }
+    }
+
+    private void synchronizeLabels(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Label> labelsByVersionedId) {
+        for (final VersionedLabel proposedLabel : proposed.getLabels()) {
+            final Label label = labelsByVersionedId.get(proposedLabel.getIdentifier());
+            if (label == null) {
+                final Label added = addLabel(group, proposedLabel, context.getComponentIdGenerator());
+                LOG.info("Added {} to {}", added, group);
+            } else if (updatedVersionedComponentIds.contains(proposedLabel.getIdentifier())) {
+                updateLabel(label, proposedLabel);
+                LOG.info("Updated {}", label);
+            } else {
+                label.setPosition(new Position(proposedLabel.getPosition().getX(), proposedLabel.getPosition().getY()));
+            }
+        }
+    }
+
+    private void removeMissingProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ProcessorNode> processorsByVersionedId) {
+        removeMissingComponents(group, proposed, processorsByVersionedId, VersionedProcessGroup::getProcessors, ProcessGroup::removeProcessor);
+    }
+
+    private void removeMissingInputPorts(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Port> portsByVersionedId) {
+        removeMissingComponents(group, proposed, portsByVersionedId, VersionedProcessGroup::getInputPorts, ProcessGroup::removeInputPort);
+    }
+
+    private void removeMissingOutputPorts(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Port> portsByVersionedId) {
+        removeMissingComponents(group, proposed, portsByVersionedId, VersionedProcessGroup::getOutputPorts, ProcessGroup::removeOutputPort);
+    }
+
+    private void removeMissingLabels(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Label> labelsByVersionedId) {
+        removeMissingComponents(group, proposed, labelsByVersionedId, VersionedProcessGroup::getLabels, ProcessGroup::removeLabel);
+    }
+
+    private void removeMissingFunnels(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Funnel> funnelsByVersionedId) {
+        removeMissingComponents(group, proposed, funnelsByVersionedId, VersionedProcessGroup::getFunnels, (removalGroup, funnelToRemove) -> {
+            // Skip our temporary funnel
+            if (funnelToRemove.getIdentifier().equals(removalGroup.getIdentifier() + TEMP_FUNNEL_ID_SUFFIX)) {
+                return;
+            }
+
+            removalGroup.removeFunnel(funnelToRemove);
+        });
+    }
+
+    private void removeMissingRpg(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, RemoteProcessGroup> rpgsByVersionedId) {
+        removeMissingComponents(group, proposed, rpgsByVersionedId, VersionedProcessGroup::getRemoteProcessGroups, ProcessGroup::removeRemoteProcessGroup);
+    }
+
+    private void removeMissingControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ControllerServiceNode> servicesByVersionedId) {
+        final BiConsumer<ProcessGroup, ControllerServiceNode> componentRemoval = (grp, service) -> context.getControllerServiceProvider().removeControllerService(service);
+        removeMissingComponents(group, proposed, servicesByVersionedId, VersionedProcessGroup::getControllerServices, componentRemoval);
+    }
+
+    private void removeMissingChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ProcessGroup> groupsByVersionedId) {
+        removeMissingComponents(group, proposed, groupsByVersionedId, VersionedProcessGroup::getProcessGroups,
+            (procGroup, childGroup) -> {
+                // We cannot remove a Process Group unless it is empty. At this point, we've already removed
+                // all Processors, Input Ports, etc. that are no longer needed. However, we have not removed all
+                // Process Groups. We may have a situation where we have nested Process Groups, each one consisting
+                // now of only other Process Groups that can be removed, such as A -> B -> C -> D.
+                // Each of these is a Process Group that contains only other (otherwise empty) process groups.
+                // To accomplish this, we need to use a depth-first approach, removing the inner-most group (D),
+                // then C, then B, and finally A.
+                if (!childGroup.isEmpty()) {
+                    purgeChildGroupOfEmptyChildren(childGroup);
+                }
+
+                procGroup.removeProcessGroup(childGroup);
+            });
+    }
+
+    private void purgeChildGroupOfEmptyChildren(final ProcessGroup group) {
+        for (final ProcessGroup child : group.getProcessGroups()) {
+            purgeChildGroupOfEmptyChildren(child);
+
+            if (child.isEmpty()) {
+                group.removeProcessGroup(child);
+            }
+        }
+    }
+
+    private <C, V extends VersionedComponent> void removeMissingComponents(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, C> componentsById,
+                                             final Function<VersionedProcessGroup, Collection<V>> getVersionedComponents, final BiConsumer<ProcessGroup, C> removeComponent) {
+
+        // Determine the ID's of the components to remove. To do this, we get the ID's of all components in the Process Group,
+        // and then remove from that the ID's of the components in the proposed group. That leaves us with the ID's of components
+        // that exist currently that are not in the proposed flow.
+        final Set<String> idsOfComponentsToRemove = new HashSet<>(componentsById.keySet());
+        for (final V versionedComponent : getVersionedComponents.apply(proposed)) {
+            idsOfComponentsToRemove.remove(versionedComponent.getIdentifier());
+        }
+
+        // Remove any of those components
+        for (final String idToRemove : idsOfComponentsToRemove) {
+            final C toRemove = componentsById.get(idToRemove);
+            LOG.info("Removing {} from {}", toRemove, group);
+            removeComponent.accept(group, toRemove);
+        }
+    }
+
+
+    private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships,
+                                                                       final Map<String, ProcessorNode> processorsByVersionedId) throws ProcessorInstantiationException {
+        for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
+            final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
+            if (processor == null) {
+                final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator());
+                context.getFlowManager().onProcessorAdded(added);
+
+                final Set<Relationship> proposedAutoTerminated =
+                    proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream()
+                        .map(added::getRelationship)
+                        .collect(Collectors.toSet());
+                autoTerminatedRelationships.put(added, proposedAutoTerminated);
+                LOG.info("Added {} to {}", added, group);
+            } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
+                updateProcessor(processor, proposedProcessor);
+
+                final Set<Relationship> proposedAutoTerminated =
+                    proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream()
+                        .map(processor::getRelationship)
+                        .collect(Collectors.toSet());
+
+                if (!processor.getAutoTerminatedRelationships().equals(proposedAutoTerminated)) {
+                    autoTerminatedRelationships.put(processor, proposedAutoTerminated);
+                }
+
+                LOG.info("Updated {}", processor);
+            } else {
+                processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
+            }
+        }
+    }
+
+    private void synchronizeRemoteGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, RemoteProcessGroup> rpgsByVersionedId) {
+        for (final VersionedRemoteProcessGroup proposedRpg : proposed.getRemoteProcessGroups()) {
+            final RemoteProcessGroup rpg = rpgsByVersionedId.get(proposedRpg.getIdentifier());
+            if (rpg == null) {
+                final RemoteProcessGroup added = addRemoteProcessGroup(group, proposedRpg, context.getComponentIdGenerator());
+                LOG.info("Added {} to {}", added, group);
+            } else if (updatedVersionedComponentIds.contains(proposedRpg.getIdentifier())) {
+                updateRemoteProcessGroup(rpg, proposedRpg, context.getComponentIdGenerator());
+                LOG.info("Updated {}", rpg);
+            } else {
+                rpg.setPosition(new Position(proposedRpg.getPosition().getX(), proposedRpg.getPosition().getY()));
+            }
+        }
+    }
+
+    @Override
+    public void verifyCanSynchronize(final ProcessGroup group, final VersionedProcessGroup flowContents, final boolean verifyConnectionRemoval) {
+        // Ensure no deleted child process groups contain templates and optionally no deleted connections contain data
+        // in their queue. Note that this check enforces ancestry among the group components to avoid a scenario where
+        // a component is matched by id, but it does not exist in the same hierarchy and thus will be removed and
+        // re-added when the update is performed
+        verifyCanRemoveMissingComponents(group, flowContents, verifyConnectionRemoval);
+
+        // Determine which input ports were removed from this process group
+        final Map<String, Port> removedInputPortsByVersionId = new HashMap<>();
+        group.getInputPorts()
+            .forEach(port -> removedInputPortsByVersionId.put(port.getVersionedComponentId().orElse(
+                NiFiRegistryFlowMapper.generateVersionedComponentId(port.getIdentifier())), port));
+
+        flowContents.getInputPorts().stream()
+            .map(VersionedPort::getIdentifier)
+            .forEach(removedInputPortsByVersionId::remove);
+
+        // Ensure that there are no incoming connections for any Input Port that was removed.
+        for (final Port inputPort : removedInputPortsByVersionId.values()) {
+            final List<Connection> incomingConnections = inputPort.getIncomingConnections();
+            if (!incomingConnections.isEmpty()) {
+                throw new IllegalStateException(group + " cannot be updated to the proposed flow because the proposed flow "
+                    + "does not contain the Input Port " + inputPort + " and the Input Port currently has an incoming connection");
+            }
+        }
+
+        // Determine which output ports were removed from this process group
+        final Map<String, Port> removedOutputPortsByVersionId = new HashMap<>();
+        group.getOutputPorts()
+            .forEach(port -> removedOutputPortsByVersionId.put(port.getVersionedComponentId().orElse(
+                NiFiRegistryFlowMapper.generateVersionedComponentId(port.getIdentifier())), port));
+
+        flowContents.getOutputPorts().stream()
+            .map(VersionedPort::getIdentifier)
+            .forEach(removedOutputPortsByVersionId::remove);
+
+        // Ensure that there are no outgoing connections for any Output Port that was removed.
+        for (final Port outputPort : removedOutputPortsByVersionId.values()) {
+            final Set<Connection> outgoingConnections = outputPort.getConnections();
+            if (!outgoingConnections.isEmpty()) {
+                throw new IllegalStateException(group + " cannot be updated to the proposed flow because the proposed flow "
+                    + "does not contain the Output Port " + outputPort + " and the Output Port currently has an outgoing connection");
+            }
+        }
+
+        // Ensure that all Processors are instantiable
+        final Map<String, VersionedProcessor> proposedProcessors = new HashMap<>();
+        findAllProcessors(flowContents, proposedProcessors);
+
+        group.findAllProcessors()
+            .forEach(proc -> proposedProcessors.remove(proc.getVersionedComponentId().orElse(
+                NiFiRegistryFlowMapper.generateVersionedComponentId(proc.getIdentifier()))));
+
+        for (final VersionedProcessor processorToAdd : proposedProcessors.values()) {
+            final String processorToAddClass = processorToAdd.getType();
+            final BundleCoordinate processorToAddCoordinate = toCoordinate(processorToAdd.getBundle());
+
+            // Get the exact bundle requested, if it exists.
+            final Bundle bundle = processorToAdd.getBundle();
+            final BundleCoordinate coordinate = new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
+            final org.apache.nifi.bundle.Bundle resolved = context.getExtensionManager().getBundle(coordinate);
+
+            if (resolved == null) {
+                // Could not resolve the bundle explicitly. Check for possible bundles.
+                final List<org.apache.nifi.bundle.Bundle> possibleBundles = context.getExtensionManager().getBundles(processorToAddClass);
+                final boolean bundleExists = possibleBundles.stream()
+                    .anyMatch(b -> processorToAddCoordinate.equals(b.getBundleDetails().getCoordinate()));
+
+                if (!bundleExists && possibleBundles.size() != 1) {
+                    LOG.warn("Unknown bundle {} for processor type {} - will use Ghosted component instead", processorToAddCoordinate, processorToAddClass);
+                }
+            }
+        }
+
+        // Ensure that all Controller Services are instantiable
+        final Map<String, VersionedControllerService> proposedServices = new HashMap<>();
+        findAllControllerServices(flowContents, proposedServices);
+
+        group.findAllControllerServices()
+            .forEach(service -> proposedServices.remove(service.getVersionedComponentId().orElse(
+                NiFiRegistryFlowMapper.generateVersionedComponentId(service.getIdentifier()))));
+
+        for (final VersionedControllerService serviceToAdd : proposedServices.values()) {
+            final String serviceToAddClass = serviceToAdd.getType();
+            final BundleCoordinate serviceToAddCoordinate = toCoordinate(serviceToAdd.getBundle());
+
+            final org.apache.nifi.bundle.Bundle resolved = context.getExtensionManager().getBundle(serviceToAddCoordinate);
+            if (resolved == null) {
+                final List<org.apache.nifi.bundle.Bundle> possibleBundles = context.getExtensionManager().getBundles(serviceToAddClass);
+                final boolean bundleExists = possibleBundles.stream()
+                    .anyMatch(b -> serviceToAddCoordinate.equals(b.getBundleDetails().getCoordinate()));
+
+                if (!bundleExists && possibleBundles.size() != 1) {
+                    LOG.warn("Unknown bundle {} for processor type {} - will use Ghosted component instead", serviceToAddCoordinate, serviceToAddClass);
+//                    throw new IllegalArgumentException("Unknown bundle " + serviceToAddCoordinate.toString() + " for service type " + serviceToAddClass);
+                }
+            }
+        }
+
+        // Ensure that all Prioritizers are instantiate-able and that any load balancing configuration is correct

Review comment:
       consistency with "instantiable"

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
##########
@@ -2559,7 +2559,7 @@ public void verifyCanInstantiate(final String groupId, final FlowSnippetDTO snip
 
     @Override
     public void verifyComponentTypes(final VersionedProcessGroup versionedGroup) {
-        controllerFacade.verifyComponentTypes(versionedGroup);
+//        controllerFacade.verifyComponentTypes(versionedGroup);

Review comment:
       maybe a comment to indicate intent (commented out)?

##########
File path: nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/DifferenceType.java
##########
@@ -120,6 +120,36 @@
      */
     PROPERTY_PARAMETERIZATION_REMOVED("Property Parameterization Removed"),
 
+    /**
+     * Parameter exists for a given Parameter Context in Flow B but does not exist in Flow A
+     */
+    PARAMETER_ADDED("Parameter Added"),
+
+    /**
+     * Parameter exists for a given Parameter Context in Flow A but does not exist in Flow B
+     */
+    PARAMETER_REMOVED("Parameter Removed"),
+
+    /**
+     * The value of the Parameter is different each of the flows

Review comment:
       in each

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
##########
@@ -0,0 +1,978 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.serialization;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.AuthorizerCapabilityDetection;
+import org.apache.nifi.authorization.ManagedAuthorizer;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.controller.AbstractComponentNode;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.MissingBundleException;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.SnippetManager;
+import org.apache.nifi.controller.StandardSnippet;
+import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.UninheritableFlowException;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
+import org.apache.nifi.controller.flow.VersionedRegistry;
+import org.apache.nifi.controller.flow.VersionedTemplate;
+import org.apache.nifi.controller.inheritance.AuthorizerCheck;
+import org.apache.nifi.controller.inheritance.BundleCompatibilityCheck;
+import org.apache.nifi.controller.inheritance.ConnectionMissingCheck;
+import org.apache.nifi.controller.inheritance.FlowInheritability;
+import org.apache.nifi.controller.inheritance.FlowInheritabilityCheck;
+import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.encrypt.PropertyEncryptor;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.groups.AbstractComponentScheduler;
+import org.apache.nifi.groups.BundleUpdateStrategy;
+import org.apache.nifi.groups.ComponentIdGenerator;
+import org.apache.nifi.groups.ComponentScheduler;
+import org.apache.nifi.groups.GroupSynchronizationOptions;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.parameter.Parameter;
+import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.parameter.ParameterDescriptor;
+import org.apache.nifi.persistence.FlowConfigurationArchiveManager;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedParameter;
+import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.DifferenceDescriptor;
+import org.apache.nifi.registry.flow.diff.FlowComparator;
+import org.apache.nifi.registry.flow.diff.FlowComparison;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
+import org.apache.nifi.registry.flow.mapping.ComponentIdLookup;
+import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
+import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.services.FlowService;
+import org.apache.nifi.util.BundleUtils;
+import org.apache.nifi.util.FlowDifferenceFilters;
+import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.web.api.dto.BundleDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+
+public class VersionedFlowSynchronizer implements FlowSynchronizer {
+    private static final Logger logger = LoggerFactory.getLogger(VersionedFlowSynchronizer.class);
+    private static final String ENCRYPTED_VALUE_PREFIX = "enc{";
+    private static final String ENCRYPTED_VALUE_SUFFIX = "}";
+
+    private final PropertyEncryptor encryptor;
+    private final ExtensionManager extensionManager;
+    private final File flowStorageFile;
+    private final FlowConfigurationArchiveManager archiveManager;
+
+    public VersionedFlowSynchronizer(final PropertyEncryptor encryptor, final ExtensionManager extensionManager, final File flowStorageFile, final FlowConfigurationArchiveManager archiveManager) {
+        this.encryptor = encryptor;
+        this.extensionManager = extensionManager;
+        this.flowStorageFile = flowStorageFile;
+        this.archiveManager = archiveManager;
+    }
+
+
+    public synchronized void sync(final FlowController controller, final DataFlow proposedFlow, final PropertyEncryptor encryptor, final FlowService flowService,
+                                  final BundleUpdateStrategy bundleUpdateStrategy)
+                    throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException, MissingBundleException {
+
+        final long start = System.currentTimeMillis();
+        final FlowManager flowManager = controller.getFlowManager();
+        final ProcessGroup root = flowManager.getRootGroup();
+
+        // handle corner cases involving no proposed flow
+        if (proposedFlow == null) {
+            if (root.isEmpty()) {
+                return;  // no sync to perform
+            } else {
+                throw new UninheritableFlowException("Proposed configuration is empty, but the controller contains a data flow.");
+            }
+        }
+
+        // determine if the controller already had flow sync'd to it
+        final boolean flowAlreadySynchronized = controller.isFlowSynchronized();
+        logger.info("Synching FlowController with proposed flow: Controller Already Synchronized = {}", flowAlreadySynchronized);
+
+        // If bundle update strategy is configured to allow for compatible bundles, update any components to use compatible bundles if
+        // the exact bundle does not exist.
+        if (bundleUpdateStrategy == BundleUpdateStrategy.USE_SPECIFIED_OR_COMPATIBLE_OR_GHOST) {
+            mapCompatibleBundles(proposedFlow, controller.getExtensionManager());
+        }
+
+        // serialize controller state to bytes
+        final DataFlow existingDataFlow = getExistingDataFlow(controller);
+        checkFlowInheritability(existingDataFlow, proposedFlow, controller, bundleUpdateStrategy);
+
+        final FlowComparison flowComparison = compareFlows(existingDataFlow, proposedFlow, encryptor);
+        final Set<FlowDifference> flowDifferences = flowComparison.getDifferences();
+        if (flowDifferences.isEmpty()) {
+            logger.debug("No differences between current flow and proposed flow. Will not create backup of existing flow.");
+        } else if (isExistingFlowEmpty(controller)) {
+            logger.debug("Currently loaded dataflow is empty. Will not create backup of existing flow.");
+        } else {
+            backupExistingFlow();
+        }
+
+        final AffectedComponentSet affectedComponents = determineAffectedComponents(flowComparison, controller);
+        final AffectedComponentSet activeSet = affectedComponents.toActiveSet();
+
+        // Stop the active components, and then wait for all components to be stopped.
+        logger.info("In order to inherit proposed dataflow, will stop any components that will be affected by the update");
+        activeSet.stop();
+
+        try {
+            // Ensure that the proposed flow doesn't remove any Connections for which there is currently data queued
+            verifyNoConnectionsWithDataRemoved(existingDataFlow, proposedFlow, controller, flowComparison);
+
+            synchronizeFlow(controller, existingDataFlow, proposedFlow, affectedComponents);
+        } finally {
+            // We have to call toExistingSet() here because some of the components that existed in the active set may no longer exist,
+            // so attempting to start them will fail.
+            activeSet.toExistingSet().toStartableSet().start();
+        }
+
+        final long millis = System.currentTimeMillis() - start;
+        logger.info("Successfully synchronized dataflow with the proposed flow in {} millis", millis);
+    }
+
+    private void verifyNoConnectionsWithDataRemoved(final DataFlow existingFlow, final DataFlow proposedFlow, final FlowController controller, final FlowComparison flowComparison) {
+        logger.debug("Checking that no connections were removed that have data");
+        final FlowInheritabilityCheck processGroupInheritableCheck = new ConnectionMissingCheck(flowComparison);
+        final FlowInheritability inheritability = processGroupInheritableCheck.checkInheritability(existingFlow, proposedFlow, controller);
+
+        if (inheritability.isInheritable()) {
+            logger.debug("Proposed flow contains all connections that currently have data queued. Will backup existing flow and replace, provided all other checks pass");
+        } else {
+            throw new UninheritableFlowException("Proposed flow is not inheritable by the flow controller and cannot completely replace the current flow due to: "
+                + inheritability.getExplanation());
+        }
+    }
+
+    private void mapCompatibleBundles(final DataFlow proposedFlow, final ExtensionManager extensionManager) {
+        final Set<String> missingComponentIds = proposedFlow.getMissingComponents();
+        final VersionedDataflow dataflow = proposedFlow.getVersionedDataflow();
+
+        for (final VersionedReportingTask reportingTask : dataflow.getReportingTasks()) {
+            if (missingComponentIds.contains(reportingTask.getInstanceIdentifier())) {
+                continue;
+            }
+
+            final Bundle compatibleBundle = getCompatibleBundle(reportingTask.getBundle(), extensionManager, reportingTask.getType());
+            if (compatibleBundle != null) {
+                reportingTask.setBundle(compatibleBundle);
+            }
+        }
+
+        for (final VersionedControllerService service : dataflow.getControllerServices()) {
+            if (missingComponentIds.contains(service.getInstanceIdentifier())) {
+                continue;
+            }
+
+            final Bundle compatibleBundle = getCompatibleBundle(service.getBundle(), extensionManager, service.getType());
+            if (compatibleBundle != null) {
+                service.setBundle(compatibleBundle);
+            }
+        }
+
+        mapCompatibleBundles(dataflow.getRootGroup(), extensionManager, missingComponentIds);
+    }
+
+    private void mapCompatibleBundles(final VersionedProcessGroup group, final ExtensionManager extensionManager, final Set<String> missingComponentIds) {
+        for (final VersionedControllerService service : group.getControllerServices()) {
+            if (missingComponentIds.contains(service.getInstanceIdentifier())) {
+                continue;
+            }
+
+            final Bundle compatibleBundle = getCompatibleBundle(service.getBundle(), extensionManager, service.getType());
+            if (compatibleBundle != null) {
+                service.setBundle(compatibleBundle);
+            }
+        }
+
+        for (final VersionedProcessor processor : group.getProcessors()) {
+            if (missingComponentIds.contains(processor.getInstanceIdentifier())) {
+                continue;
+            }
+
+            final Bundle compatibleBundle = getCompatibleBundle(processor.getBundle(), extensionManager, processor.getType());
+            if (compatibleBundle != null) {
+                processor.setBundle(compatibleBundle);
+            }
+        }
+
+        for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
+            mapCompatibleBundles(childGroup, extensionManager, missingComponentIds);
+        }
+    }
+
+    private Bundle getCompatibleBundle(final Bundle bundle, final ExtensionManager extensionManager, final String type) {
+        final org.apache.nifi.bundle.Bundle exactBundle = extensionManager.getBundle(new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion()));
+        if (exactBundle != null) {
+            return bundle;
+        }
+
+        final BundleDTO bundleDto = new BundleDTO(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
+        final Optional<BundleCoordinate> optionalCoordinate = BundleUtils.getOptionalCompatibleBundle(extensionManager, type, bundleDto);
+        if (optionalCoordinate.isPresent()) {
+            final BundleCoordinate coordinate = optionalCoordinate.get();
+            logger.debug("Found compatible bundle {} for {}:{}:{} and type {}", coordinate.getCoordinate(), bundle.getGroup(), bundle.getArtifact(), bundle.getVersion(), type);
+            return new Bundle(coordinate.getGroup(), coordinate.getId(), coordinate.getVersion());
+        }
+
+        logger.debug("Could not find a compatible bundle for {}:{}:{} type {}", bundle.getGroup(), bundle.getArtifact(), bundle.getVersion(), type);
+        return null;
+    }
+
+    private void synchronizeFlow(final FlowController controller, final DataFlow existingFlow, final DataFlow proposedFlow, final AffectedComponentSet affectedComponentSet) {
+        // attempt to sync controller with proposed flow
+        try {
+            final VersionedDataflow versionedFlow = proposedFlow.getVersionedDataflow();
+
+            if (versionedFlow != null) {
+                controller.setMaxTimerDrivenThreadCount(versionedFlow.getMaxTimerDrivenThreadCount());
+                ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
+
+                final Map<String, VersionedParameterContext> versionedParameterContextMap = new HashMap<>();
+                versionedFlow.getParameterContexts().forEach(context -> versionedParameterContextMap.put(context.getName(), context));
+
+                final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
+                versionedFlowSnapshot.setParameterContexts(versionedParameterContextMap);
+                versionedFlowSnapshot.setFlowContents(versionedFlow.getRootGroup());
+
+                // Inherit controller-level components.
+                inheritControllerServices(controller, versionedFlow, affectedComponentSet);
+                inheritParameterContexts(controller, versionedFlow);
+                inheritReportingTasks(controller, versionedFlow, affectedComponentSet);
+                inheritRegistries(controller, versionedFlow);
+
+                final ComponentIdGenerator componentIdGenerator = (proposedId, instanceId, destinationGroupId) -> instanceId;
+
+                final ComponentScheduler componentScheduler = new FlowControllerComponentScheduler(controller);
+
+                if (rootGroup.isEmpty()) {
+                    final VersionedProcessGroup versionedRoot = versionedFlowSnapshot.getFlowContents();
+                    rootGroup = controller.getFlowManager().createProcessGroup(versionedRoot.getInstanceIdentifier());
+                    rootGroup.setComments(versionedRoot.getComments());
+                    rootGroup.setPosition(new Position(versionedRoot.getPosition().getX(), versionedRoot.getPosition().getY()));
+                    rootGroup.setName(versionedRoot.getName());
+                    controller.setRootGroup(rootGroup);
+                }
+
+                // We must remove templates before attempting to synchronize the Process Group, as synchronizing may result in removal of a Process Group,
+                // which cannot be done while Templates exist. After synchronizing root Process Group, we will inherit any templates in the proposed flow
+                final Set<Template> allTemplates = controller.getFlowManager().getRootGroup().findAllTemplates();
+                allTemplates.forEach(template -> template.getProcessGroup().removeTemplate(template));
+
+                // Synchronize the root group
+                final GroupSynchronizationOptions syncOptions = new GroupSynchronizationOptions.Builder()
+                    .componentIdGenerator(componentIdGenerator)
+                    .componentScheduler(componentScheduler)
+                    .ignoreLocalModifications(true)
+                    .updateGroupSettings(true)
+                    .updateDescendantVersionedFlows(true)
+                    .updateExistingVariables(true)
+                    .updateGroupVersionControlSnapshot(false)
+                    .updateExistingVariables(true)
+                    .updateRpgUrls(true)
+                    .propertyDecryptor(encryptor::decrypt)
+                    .build();
+
+                final FlowMappingOptions flowMappingOptions = new FlowMappingOptions.Builder()
+                    .mapSensitiveConfiguration(true)
+                    .mapPropertyDescriptors(false)
+                    .stateLookup(VersionedComponentStateLookup.IDENTITY_LOOKUP)
+                    .sensitiveValueEncryptor(encryptor::encrypt)
+                    .componentIdLookup(ComponentIdLookup.VERSIONED_OR_GENERATE)
+                    .mapInstanceIdentifiers(true)
+                    .mapControllerServiceReferencesToVersionedId(false)
+                    .build();
+
+                rootGroup.synchronizeFlow(versionedFlowSnapshot, syncOptions, flowMappingOptions);
+
+                // Inherit templates, now that all necessary Process Groups have been created
+                inheritTemplates(controller, versionedFlow);
+
+                rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
+            }
+
+            inheritSnippets(controller, proposedFlow);
+            inheritAuthorizations(existingFlow, proposedFlow, controller);
+        } catch (final Exception ex) {
+            throw new FlowSynchronizationException(ex);
+        }
+    }
+
+    private FlowComparison compareFlows(final DataFlow existingFlow, final DataFlow proposedFlow, final PropertyEncryptor encryptor) {
+        final DifferenceDescriptor differenceDescriptor = new StaticDifferenceDescriptor();
+
+        final VersionedDataflow existingVersionedFlow = existingFlow.getVersionedDataflow() == null ? createEmptyVersionedDataflow() : existingFlow.getVersionedDataflow();
+        final ComparableDataFlow localDataFlow = new StandardComparableDataFlow("Local Flow", existingVersionedFlow.getRootGroup(), toSet(existingVersionedFlow.getControllerServices()),
+            toSet(existingVersionedFlow.getReportingTasks()), toSet(existingVersionedFlow.getParameterContexts()));
+
+        final VersionedDataflow clusterVersionedFlow = proposedFlow.getVersionedDataflow();
+        final ComparableDataFlow clusterDataFlow = new StandardComparableDataFlow("Cluster Flow", clusterVersionedFlow.getRootGroup(), toSet(clusterVersionedFlow.getControllerServices()),
+            toSet(clusterVersionedFlow.getReportingTasks()), toSet(clusterVersionedFlow.getParameterContexts()));
+
+        final FlowComparator flowComparator = new StandardFlowComparator(localDataFlow, clusterDataFlow, Collections.emptySet(), differenceDescriptor, encryptor::decrypt);
+        final FlowComparison flowComparison = flowComparator.compare();
+        return flowComparison;
+    }
+
+    private <T> Set<T> toSet(final List<T> values) {
+        if (values == null || values.isEmpty()) {
+            return new HashSet<>();
+        }
+
+        return new HashSet<>(values);
+    }
+
+    private VersionedDataflow createEmptyVersionedDataflow() {
+        final VersionedDataflow dataflow = new VersionedDataflow();
+        dataflow.setControllerServices(Collections.emptyList());
+        dataflow.setEncodingVersion(new VersionedFlowEncodingVersion(2, 0));
+        dataflow.setParameterContexts(Collections.emptyList());
+        dataflow.setRegistries(Collections.emptyList());
+        dataflow.setReportingTasks(Collections.emptyList());
+        dataflow.setRootGroup(new VersionedProcessGroup());
+        return dataflow;
+    }
+
+    private AffectedComponentSet determineAffectedComponents(final FlowComparison flowComparison, final FlowController controller) {
+        final List<FlowDifference> relevantDifferences = flowComparison.getDifferences().stream()
+            .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
+            .collect(Collectors.toList());
+
+        logger.debug("The differences between Local Flow and Cluster Flow that are relevant for finding affected components are: {}", relevantDifferences);
+
+        final AffectedComponentSet affectedComponentSet = new AffectedComponentSet(controller);
+        for (final FlowDifference difference : relevantDifferences) {
+            affectedComponentSet.addAffectedComponents(difference);
+        }
+
+        logger.debug("Components affected by inheriting the flow are: {}", affectedComponentSet);
+        return affectedComponentSet;
+    }
+
+
+    private void inheritTemplates(final FlowController controller, final VersionedDataflow dataflow) {
+        if (dataflow.getTemplates() == null) {
+            return;
+        }
+
+        logger.debug("Synchronizing templates in dataflow");
+        final FlowManager flowManager = controller.getFlowManager();
+        for (final VersionedTemplate versionedTemplate : dataflow.getTemplates()) {
+            final ProcessGroup group = flowManager.getGroup(versionedTemplate.getGroupIdentifier());
+            if (group == null) {
+                logger.warn("Found Template for Process Group with ID {} but no Process Group exists with that ID", versionedTemplate.getGroupIdentifier());
+                continue;
+            }
+
+            group.addTemplate(new Template(versionedTemplate.getTemplateDto()));
+        }
+    }
+
+    private void inheritRegistries(final FlowController controller, final VersionedDataflow dataflow) {
+        final FlowRegistryClient registryClient = controller.getFlowRegistryClient();
+
+        for (final VersionedRegistry versionedRegistry : dataflow.getRegistries()) {
+            final FlowRegistry flowRegistry = registryClient.getFlowRegistry(versionedRegistry.getId());
+            if (flowRegistry == null) {
+                // Check if there's any registry with the same name. If so, remove it so that we can add this one in.
+                for (final String registryId : registryClient.getRegistryIdentifiers()) {
+                    final FlowRegistry registry = registryClient.getFlowRegistry(registryId);
+                    if (Objects.equals(versionedRegistry.getName(), registry.getName())) {
+                        registryClient.removeFlowRegistry(registryId);
+                        break;
+                    }
+                }
+
+                addRegistry(registryClient, versionedRegistry);
+            } else {
+                updateRegistry(flowRegistry, versionedRegistry);
+            }
+        }
+    }
+
+    private void addRegistry(final FlowRegistryClient registryClient, final VersionedRegistry versionedRegistry) {
+        registryClient.addFlowRegistry(versionedRegistry.getId(), versionedRegistry.getName(), versionedRegistry.getUrl(), versionedRegistry.getDescription());
+    }
+
+    private void updateRegistry(final FlowRegistry registry, final VersionedRegistry versionedRegistry) {
+        registry.setDescription(versionedRegistry.getDescription());
+        registry.setName(versionedRegistry.getName());
+        registry.setURL(versionedRegistry.getUrl());
+    }
+
+    private void inheritReportingTasks(final FlowController controller, final VersionedDataflow dataflow, final AffectedComponentSet affectedComponentSet) throws ReportingTaskInstantiationException {
+        for (final VersionedReportingTask versionedReportingTask : dataflow.getReportingTasks()) {
+            final ReportingTaskNode existing = controller.getReportingTaskNode(versionedReportingTask.getInstanceIdentifier());
+            if (existing == null) {
+                addReportingTask(controller, versionedReportingTask);
+            } else if (affectedComponentSet.isReportingTaskAffected(existing.getIdentifier())) {
+                updateReportingTask(existing, versionedReportingTask, controller);
+            }
+        }
+    }
+
+    private void addReportingTask(final FlowController controller, final VersionedReportingTask reportingTask) throws ReportingTaskInstantiationException {
+        final BundleCoordinate coordinate = createBundleCoordiate(reportingTask.getBundle(), reportingTask.getType());
+
+        final ReportingTaskNode taskNode = controller.createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false);
+        updateReportingTask(taskNode, reportingTask, controller);
+    }
+
+    private void updateReportingTask(final ReportingTaskNode taskNode, final VersionedReportingTask reportingTask, final FlowController controller) {
+        taskNode.setName(reportingTask.getName());
+        taskNode.setComments(reportingTask.getComments());
+        taskNode.setSchedulingPeriod(reportingTask.getSchedulingPeriod());
+        taskNode.setSchedulingStrategy(SchedulingStrategy.valueOf(reportingTask.getSchedulingStrategy()));
+
+        taskNode.setAnnotationData(reportingTask.getAnnotationData());
+        taskNode.setProperties(reportingTask.getProperties());
+
+        // enable/disable/start according to the ScheduledState
+        switch (reportingTask.getScheduledState()) {
+            case DISABLED:
+                if (taskNode.isRunning()) {
+                    controller.stopReportingTask(taskNode);
+                }
+                controller.disableReportingTask(taskNode);
+                break;
+            case ENABLED:
+                if (taskNode.getScheduledState() == org.apache.nifi.controller.ScheduledState.DISABLED) {
+                    controller.enableReportingTask(taskNode);
+                } else if (taskNode.isRunning()) {
+                    controller.stopReportingTask(taskNode);
+                }
+                break;
+            case RUNNING:
+                if (taskNode.getScheduledState() == org.apache.nifi.controller.ScheduledState.DISABLED) {
+                    controller.enableReportingTask(taskNode);
+                }
+                if (!taskNode.isRunning()) {
+                    controller.startReportingTask(taskNode);
+                }
+                break;
+        }
+    }
+
+    private void inheritParameterContexts(final FlowController controller, final VersionedDataflow dataflow) {
+        final ParameterContextManager parameterContextManager = controller.getFlowManager().getParameterContextManager();
+
+        // Add any parameter context that doesn't yet exist. We have to add all contexts before updating them because
+        // one context may reference another. We need that reference to exist before we try to create the reference.
+        final Map<String, ParameterContext> parameterContextsByName = parameterContextManager.getParameterContextNameMapping();
+
+        controller.getFlowManager().withParameterContextResolution(() -> {
+            for (final VersionedParameterContext versionedParameterContext : dataflow.getParameterContexts()) {
+                inheritParameterContext(versionedParameterContext, controller.getFlowManager(), parameterContextsByName);
+            }
+        });
+    }
+
+    private void inheritParameterContext(final VersionedParameterContext versionedParameterContext, final FlowManager flowManager, final Map<String, ParameterContext> parameterContextsByName) {
+        final ParameterContext existingContext = parameterContextsByName.get(versionedParameterContext.getName());
+        if (existingContext == null) {
+            addParameterContext(versionedParameterContext, flowManager);
+        } else {
+            updateParameterContext(versionedParameterContext, existingContext, flowManager);
+        }
+    }
+
+    private void addParameterContext(final VersionedParameterContext versionedParameterContext, final FlowManager flowManager) {
+        final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext);
+
+        final ParameterContextManager contextManager = flowManager.getParameterContextManager();
+        final List<String> referenceIds = findReferencedParameterContextIds(versionedParameterContext, contextManager);
+
+        flowManager.createParameterContext(versionedParameterContext.getInstanceIdentifier(), versionedParameterContext.getName(), parameters, referenceIds);
+        logger.info("Added Parameter Context {}", versionedParameterContext.getName());
+    }
+
+    private List<String> findReferencedParameterContextIds(final VersionedParameterContext versionedParameterContext, final ParameterContextManager contextManager) {
+        final List<String> referenceIds = new ArrayList<>();
+        final Map<String, ParameterContext> parameterContextsByName = contextManager.getParameterContextNameMapping();
+
+        if (versionedParameterContext.getInheritedParameterContexts() != null) {
+            for (final String inheritedContextName : versionedParameterContext.getInheritedParameterContexts()) {
+                final ParameterContext existingContext = parameterContextsByName.get(inheritedContextName);
+                if (existingContext == null) {
+                    logger.warn("Parameter Context {} inherits from Parameter Context {} but cannot find a Parameter Context with name {}",
+                        versionedParameterContext.getName(), inheritedContextName, inheritedContextName);
+                } else {
+                    referenceIds.add(existingContext.getIdentifier());
+                }
+            }
+        }
+
+        return referenceIds;
+    }
+
+    private Map<String, Parameter> createParameterMap(final VersionedParameterContext versionedParameterContext) {
+        final Map<String, Parameter> parameters = new HashMap<>();
+        for (final VersionedParameter versioned : versionedParameterContext.getParameters()) {
+            final ParameterDescriptor descriptor = new ParameterDescriptor.Builder()
+                .description(versioned.getDescription())
+                .name(versioned.getName())
+                .sensitive(versioned.isSensitive())
+                .build();
+
+            final String parameterValue;
+            final String rawValue = versioned.getValue();
+            if (rawValue == null) {
+                parameterValue = null;
+            } else if (versioned.isSensitive() && rawValue.startsWith(ENCRYPTED_VALUE_PREFIX) && rawValue.endsWith(ENCRYPTED_VALUE_SUFFIX)) {
+                final String extractedValue = rawValue.substring(ENCRYPTED_VALUE_PREFIX.length(), rawValue.length() - ENCRYPTED_VALUE_SUFFIX.length());
+                parameterValue = encryptor.decrypt(extractedValue);
+            } else {
+                parameterValue = rawValue;
+            }
+
+            final Parameter parameter = new Parameter(descriptor, parameterValue);
+            parameters.put(versioned.getName(), parameter);
+        }
+
+        return parameters;
+    }
+
+    private void updateParameterContext(final VersionedParameterContext versionedParameterContext, final ParameterContext parameterContext, final FlowManager flowManager) {
+        final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext);
+
+        final Map<String, String> currentValues = new HashMap<>();
+        parameterContext.getParameters().values().forEach(param -> currentValues.put(param.getDescriptor().getName(), param.getValue()));
+
+        if (logger.isDebugEnabled()) {
+            final Map<String, String> proposedValues = parameters.entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getValue()));
+            logger.debug("For Parameter Context {}, current parameters = {}, proposed = {}", parameterContext.getName(), currentValues, proposedValues);
+        }
+
+        final Map<String, Parameter> updatedParameters = new HashMap<>();
+        final Set<String> proposedParameterNames = new HashSet<>();
+        for (final VersionedParameter parameter : versionedParameterContext.getParameters()) {
+            final String parameterName = parameter.getName();
+            final String currentValue = currentValues.get(parameterName);
+
+            proposedParameterNames.add(parameterName);
+            if (!Objects.equals(currentValue, parameter.getValue())) {
+                final Parameter updatedParameterObject = parameters.get(parameterName);
+                updatedParameters.put(parameterName, updatedParameterObject);
+            }
+        }
+
+        // If any parameters are removed, need to add a null value to the map in order to make sure that the parameter is removed.
+        for (final ParameterDescriptor existingParameterDescriptor : parameterContext.getParameters().keySet()) {
+            final String name = existingParameterDescriptor.getName();
+            if (!proposedParameterNames.contains(name)) {
+                updatedParameters.put(name, null);
+            }
+        }
+
+        if (updatedParameters.isEmpty()) {
+            logger.debug("No Parameters to update for Parameter Context {}", parameterContext.getName());
+        } else {
+            parameterContext.setParameters(updatedParameters);
+            logger.info("Updated the following Parameters for Parameter Context {}: {}", parameterContext.getName(), updatedParameters.keySet());
+        }
+
+        final ParameterContextManager contextManager = flowManager.getParameterContextManager();
+        final List<String> inheritedContextIds = findReferencedParameterContextIds(versionedParameterContext, contextManager);
+        final List<ParameterContext> referencedContexts = inheritedContextIds.stream()
+            .map(contextManager::getParameterContext)
+            .collect(Collectors.toList());
+        parameterContext.setInheritedParameterContexts(referencedContexts);
+    }
+
+    private void inheritControllerServices(final FlowController controller, final VersionedDataflow dataflow, final AffectedComponentSet affectedComponentSet) {
+        final FlowManager flowManager = controller.getFlowManager();
+
+        final Set<ControllerServiceNode> toEnable = new HashSet<>();
+        final Set<ControllerServiceNode> toDisable = new HashSet<>();
+
+        // We need to add any Controller Services that are not yet part of the flow. We must then
+        // update the Controller Services to match what is proposed. Finally, we can enable the services.
+        // We have to do this in 3 parts because if we just configure the Controller Service as we add it,
+        // we will have a situation where Service A references Service B. And if Service A is added first,
+        // Service B's references won't be updated. To avoid this, we create them all first, and then configure/update
+        // them so that when AbstractComponentNode#setProperty is called, it properly establishes that reference.
+        final List<VersionedControllerService> controllerServices = dataflow.getControllerServices();
+        for (final VersionedControllerService versionedControllerService : controllerServices) {
+            final ControllerServiceNode serviceNode = flowManager.getRootControllerService(versionedControllerService.getInstanceIdentifier());
+            if (serviceNode == null) {
+                addRootControllerService(controller, versionedControllerService);
+            }
+        }
+
+        for (final VersionedControllerService versionedControllerService : controllerServices) {
+            final ControllerServiceNode serviceNode = flowManager.getRootControllerService(versionedControllerService.getInstanceIdentifier());
+            if (affectedComponentSet.isControllerServiceAffected(serviceNode.getIdentifier())) {
+                updateRootControllerService(serviceNode, versionedControllerService);
+            }
+        }
+
+        for (final VersionedControllerService versionedControllerService : controllerServices) {
+            final ControllerServiceNode serviceNode = flowManager.getRootControllerService(versionedControllerService.getInstanceIdentifier());
+
+            if (versionedControllerService.getScheduledState() == ScheduledState.ENABLED) {
+                toEnable.add(serviceNode);
+            } else {
+                toDisable.add(serviceNode);
+            }
+        }
+
+        // Enable any Controller-level services that are intended to be enabled.
+        if (!toEnable.isEmpty()) {
+            controller.getControllerServiceProvider().enableControllerServices(toEnable);
+        }
+
+        // Disable any Controller-level services that are intended to be disabled.
+        if (!toDisable.isEmpty()) {
+            controller.getControllerServiceProvider().disableControllerServicesAsync(toDisable);
+        }
+    }
+
+    private void addRootControllerService(final FlowController controller, final VersionedControllerService versionedControllerService) {
+        final BundleCoordinate bundleCoordinate = createBundleCoordiate(versionedControllerService.getBundle(), versionedControllerService.getType());
+        final ControllerServiceNode serviceNode = controller.getFlowManager().createControllerService(versionedControllerService.getType(),
+            versionedControllerService.getInstanceIdentifier(), bundleCoordinate,Collections.emptySet(), true, true);
+
+        controller.getFlowManager().addRootControllerService(serviceNode);
+    }
+
+    private void updateRootControllerService(final ControllerServiceNode serviceNode, final VersionedControllerService versionedControllerService) {
+        serviceNode.pauseValidationTrigger();
+        try {
+            serviceNode.setName(versionedControllerService.getName());
+            serviceNode.setAnnotationData(versionedControllerService.getAnnotationData());
+            serviceNode.setComments(versionedControllerService.getComments());
+            serviceNode.setProperties(versionedControllerService.getProperties());
+        } finally {
+            serviceNode.resumeValidationTrigger();
+        }
+    }
+
+    private BundleCoordinate createBundleCoordiate(final Bundle bundle, final String componentType) {

Review comment:
       coordinate

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
##########
@@ -0,0 +1,645 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.serialization;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.AbstractComponentNode;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.UninheritableFlowException;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.ConnectableComponentType;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * <p>
+ *     An AffectedComponentSet is a collection of components that will need to be enabled/disabled/started/stopped in order to facilitate
+ *     some set of changes to a dataflow.
+ * </p>
+ */
+public class AffectedComponentSet {
+    private static final Logger logger = LoggerFactory.getLogger(AffectedComponentSet.class);
+    private final FlowController flowController;
+    private final FlowManager flowManager;
+
+    private final Set<Port> inputPorts = new HashSet<>();
+    private final Set<Port> outputPorts = new HashSet<>();
+    private final Set<RemoteGroupPort> remoteInputPorts = new HashSet<>();
+    private final Set<RemoteGroupPort> remoteOutputPorts = new HashSet<>();
+    private final Set<ProcessorNode> processors = new HashSet<>();
+    private final Set<ControllerServiceNode> controllerServices = new HashSet<>();
+    private final Set<ReportingTaskNode> reportingTasks = new HashSet<>();
+
+    public AffectedComponentSet(final FlowController flowController) {
+        this.flowController = flowController;
+        this.flowManager = flowController.getFlowManager();
+    }
+
+    public void addInputPort(final Port port) {
+        if (port == null) {
+            return;
+        }
+
+        inputPorts.add(port);
+    }
+
+    public void addOutputPort(final Port port) {
+        if (port == null) {
+            return;
+        }
+
+        outputPorts.add(port);
+    }
+
+    public void addRemoteInputPort(final RemoteGroupPort port) {
+        if (port == null) {
+            return;
+        }
+
+        remoteInputPorts.add(port);
+    }
+
+    public void addRemoteOutputPort(final RemoteGroupPort port) {
+        if (port == null) {
+            return;
+        }
+
+        remoteOutputPorts.add(port);
+    }
+
+    public void addRemoteProcessGroup(final RemoteProcessGroup remoteProcessGroup) {
+        if (remoteProcessGroup == null) {
+            return;
+        }
+
+        remoteProcessGroup.getInputPorts().forEach(this::addRemoteInputPort);
+        remoteProcessGroup.getOutputPorts().forEach(this::addRemoteOutputPort);
+    }
+
+    public void addProcessor(final ProcessorNode processor) {
+        if (processor == null) {
+            return;
+        }
+
+        processors.add(processor);
+    }
+
+    public void addControllerService(final ControllerServiceNode controllerService) {
+        if (controllerService == null) {
+            return;
+        }
+
+        controllerServices.add(controllerService);
+
+        final List<ComponentNode> referencingComponents = controllerService.getReferences().findRecursiveReferences(ComponentNode.class);
+        for (final ComponentNode reference : referencingComponents) {
+            if (reference instanceof ControllerServiceNode) {
+                addControllerService((ControllerServiceNode) reference);
+            } else if (reference instanceof ProcessorNode) {
+                addProcessor((ProcessorNode) reference);
+            } else if (reference instanceof ReportingTaskNode) {
+                addReportingTask((ReportingTaskNode) reference);
+            }
+        }
+    }
+
+    public boolean isControllerServiceAffected(final String serviceId) {
+        for (final ControllerServiceNode serviceNode : controllerServices) {
+            if (serviceNode.getIdentifier().equals(serviceId)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private void addControllerServiceWithoutReferences(final ControllerServiceNode controllerService) {
+        if (controllerService == null) {
+            return;
+        }
+
+        controllerServices.add(controllerService);
+    }
+
+    public void addReportingTask(final ReportingTaskNode task) {
+        if (task == null) {
+            return;
+        }
+
+        reportingTasks.add(task);
+    }
+
+    public boolean isReportingTaskAffected(final String reportingTaskId) {
+        for (final ReportingTaskNode taskNode : reportingTasks) {
+            if (taskNode.getIdentifier().equals(reportingTaskId)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    public void addConnection(final Connection connection) {
+        if (connection == null) {
+            return;
+        }
+
+        addConnectable(connection.getSource());
+        addConnectable(connection.getDestination());
+    }
+
+    public void addConnectable(final Connectable connectable) {
+        if (connectable == null) {
+            return;
+        }
+
+        switch (connectable.getConnectableType()) {
+            case INPUT_PORT:
+                addInputPort((Port) connectable);
+                break;
+            case OUTPUT_PORT:
+                addOutputPort((Port) connectable);
+                break;
+            case PROCESSOR:
+                addProcessor((ProcessorNode) connectable);
+                break;
+            case REMOTE_INPUT_PORT:
+                addRemoteInputPort((RemoteGroupPort) connectable);
+                break;
+            case REMOTE_OUTPUT_PORT:
+                addRemoteOutputPort((RemoteGroupPort) connectable);
+        }
+    }
+
+    /**
+     * Adds any component that is affected by the given Flow Difference
+     * @param difference the Flow Difference
+     */
+    public void addAffectedComponents(final FlowDifference difference) {
+        final DifferenceType differenceType = difference.getDifferenceType();
+
+        if (differenceType == DifferenceType.COMPONENT_ADDED) {
+            // The component doesn't exist. But if it's a connection, the source or the destination might. And those need to be accounted for.
+            if (difference.getComponentB().getComponentType() == ComponentType.CONNECTION) {
+                addComponentsForNewConnection((VersionedConnection) difference.getComponentB());
+            }
+
+            return;
+        }
+
+        if (differenceType == DifferenceType.PARAMETER_VALUE_CHANGED || differenceType == DifferenceType.PARAMETER_DESCRIPTION_CHANGED || differenceType == DifferenceType.PARAMETER_REMOVED) {
+            addComponentsForParameterUpdate(difference);
+            return;
+        }
+
+        if (differenceType == DifferenceType.PARAMETER_CONTEXT_CHANGED) {
+            addComponentsForParameterContextChange(difference);
+            return;
+        }
+
+        if (differenceType == DifferenceType.INHERITED_CONTEXTS_CHANGED) {
+            addComponentsForInheritedParameterContextChange(difference);
+        }
+
+        if (differenceType == DifferenceType.VARIABLE_CHANGED || differenceType == DifferenceType.VARIABLE_ADDED || differenceType == DifferenceType.VARIABLE_REMOVED) {
+            addComponentsForVariableChange(difference.getComponentA().getInstanceIdentifier(), difference.getFieldName().orElse(null));
+            return;
+        }
+
+        if (differenceType == DifferenceType.RPG_URL_CHANGED) {
+            final String instanceId = difference.getComponentA().getInstanceIdentifier();
+            final RemoteProcessGroup rpg = flowManager.getRootGroup().findRemoteProcessGroup(instanceId);
+            if (rpg != null) {
+                addRemoteProcessGroup(rpg);
+            }
+        }
+
+        if (differenceType == DifferenceType.COMPONENT_REMOVED && difference.getComponentA().getComponentType() == ComponentType.PROCESS_GROUP) {
+            // If a Process Group is removed, we need to consider any component within the Process Group as affected also
+            addAllComponentsWithinGroup(difference.getComponentA().getInstanceIdentifier());
+        }
+
+        addAffectedComponents(difference.getComponentA());
+    }
+
+    private void addAllComponentsWithinGroup(final String groupId) {
+        final ProcessGroup processGroup = flowManager.getGroup(groupId);
+        if (processGroup == null) {
+            return;
+        }
+
+        processGroup.getProcessors().forEach(this::addProcessor);
+        processGroup.getControllerServices(false).forEach(this::addControllerServiceWithoutReferences);
+        processGroup.getInputPorts().forEach(this::addInputPort);
+        processGroup.getOutputPorts().forEach(this::addOutputPort);
+        processGroup.getRemoteProcessGroups().forEach(this::addRemoteProcessGroup);
+        processGroup.getProcessGroups().forEach(child -> addAllComponentsWithinGroup(child.getIdentifier()));
+    }
+
+    private void addComponentsForVariableChange(final String groupId, final String variableName) {
+        if (groupId == null || variableName == null) {
+            return;
+        }
+
+        final ProcessGroup group = flowManager.getGroup(groupId);
+        if (group == null) {
+            return;
+        }
+
+        final Set<ComponentNode> affectedComponents = group.getComponentsAffectedByVariable(variableName);
+        for (final ComponentNode component : affectedComponents) {
+            if (component instanceof ProcessorNode) {
+                addProcessor((ProcessorNode) component);
+            } else if (component instanceof ControllerServiceNode) {
+                addControllerService((ControllerServiceNode) component);
+            }
+        }
+    }
+
+    private void addComponentsForInheritedParameterContextChange(final FlowDifference difference) {
+        // If the inherited parameter contexts have changed, any component referencing a parameter in that context is affected.
+        final String parameterContextId = difference.getComponentA().getInstanceIdentifier();
+        final ParameterContext context = flowManager.getParameterContextManager().getParameterContext(parameterContextId);
+        if (context == null) {
+            return;
+        }
+
+        final Set<ProcessGroup> boundGroups = context.getParameterReferenceManager().getProcessGroupsBound(context);
+        for (final ProcessGroup group : boundGroups) {
+            group.getProcessors().stream()
+                .filter(AbstractComponentNode::isReferencingParameter)
+                .forEach(this::addProcessor);
+
+            group.getControllerServices(false).stream()
+                .filter(ComponentNode::isReferencingParameter)
+                .forEach(this::addControllerService);
+        }
+    }
+
+    private void addComponentsForParameterContextChange(final FlowDifference difference) {
+        // When the parameter context that a PG is bound to is updated, any component referencing a parameter is affected.
+        final String groupId = difference.getComponentA().getInstanceIdentifier();
+        final ProcessGroup group = flowManager.getGroup(groupId);
+        if (group == null) {
+            return;
+        }
+
+        group.getProcessors().stream()
+            .filter(AbstractComponentNode::isReferencingParameter)
+            .forEach(this::addProcessor);
+
+        group.getControllerServices(false).stream()
+            .filter(ComponentNode::isReferencingParameter)
+            .forEach(this::addControllerService);
+    }
+
+    private void addComponentsForParameterUpdate(final FlowDifference difference) {
+        final DifferenceType differenceType = difference.getDifferenceType();
+
+        final Optional<String> optionalParameterName = difference.getFieldName();
+        if (!optionalParameterName.isPresent()) {
+            logger.warn("Encountered a Flow Difference {} with Difference Type of {} but no indication as to which parameter was updated.", difference, differenceType);
+            return;
+        }
+
+        final String parameterName = optionalParameterName.get();
+        final String contextId = difference.getComponentA().getInstanceIdentifier();
+        final ParameterContext parameterContext = flowManager.getParameterContextManager().getParameterContext(contextId);
+        if (parameterContext == null) {
+            logger.warn("Encountered a Flow Difference {} with a Difference Type of {} but found no Parameter Context with Instance ID {}", difference, differenceType, contextId);
+            return;
+        }
+
+        final Set<ControllerServiceNode> referencingServices = parameterContext.getParameterReferenceManager().getControllerServicesReferencing(parameterContext, parameterName);
+        final Set<ProcessorNode> referencingProcessors = parameterContext.getParameterReferenceManager().getProcessorsReferencing(parameterContext, parameterName);
+
+        referencingServices.forEach(this::addControllerService);
+        referencingProcessors.forEach(this::addProcessor);
+    }
+
+    private void addComponentsForNewConnection(final VersionedConnection connection) {
+        final ConnectableComponent sourceComponent = connection.getSource();
+        final Connectable sourceConnectable = getConnectable(sourceComponent.getType(), sourceComponent.getInstanceIdentifier());
+        if (sourceConnectable != null) {
+            addConnectable(sourceConnectable);
+        }
+
+        final ConnectableComponent destinationComponent = connection.getDestination();
+        final Connectable destinationConnectable = getConnectable(destinationComponent.getType(), destinationComponent.getInstanceIdentifier());
+        if (destinationConnectable != null) {
+            addConnectable(destinationConnectable);
+        }
+    }
+
+    private Connectable getConnectable(final ConnectableComponentType type, final String identifier) {
+        switch (type) {
+            case FUNNEL:
+                return flowManager.getFunnel(identifier);
+            case INPUT_PORT:
+                return flowManager.getInputPort(identifier);
+            case OUTPUT_PORT:
+                return flowManager.getOutputPort(identifier);
+            case PROCESSOR:
+                return flowManager.getProcessorNode(identifier);
+            case REMOTE_INPUT_PORT:
+            case REMOTE_OUTPUT_PORT:
+                return flowManager.getRootGroup().findRemoteGroupPort(identifier);
+            default:
+                return null;
+        }
+    }
+
+    private void addAffectedComponents(final VersionedComponent versionedComponent) {
+        final String componentId = versionedComponent.getInstanceIdentifier();
+        switch (versionedComponent.getComponentType()) {
+            case CONNECTION:
+                addConnection(flowManager.getConnection(componentId));
+                break;
+            case CONTROLLER_SERVICE:
+                addControllerService(flowManager.getControllerServiceNode(componentId));
+                break;
+            case INPUT_PORT:
+                addInputPort(flowManager.getInputPort(componentId));
+                break;
+            case OUTPUT_PORT:
+                addOutputPort(flowManager.getOutputPort(componentId));
+                break;
+            case PROCESS_GROUP:
+                break;
+            case PROCESSOR:
+                addProcessor(flowManager.getProcessorNode(componentId));
+                break;
+            case REMOTE_INPUT_PORT:
+                final RemoteGroupPort remoteInputPort = flowManager.getRootGroup().findRemoteGroupPort(componentId);
+                if (remoteInputPort != null) {
+                    addRemoteInputPort(remoteInputPort);
+                }
+                break;
+            case REMOTE_OUTPUT_PORT:
+                final RemoteGroupPort remoteOutputPort = flowManager.getRootGroup().findRemoteGroupPort(componentId);
+                if (remoteOutputPort != null) {
+                    addRemoteOutputPort(remoteOutputPort);
+                }
+                break;
+            case REMOTE_PROCESS_GROUP:
+                addRemoteProcessGroup(flowManager.getRootGroup().findRemoteProcessGroup(componentId));
+                break;
+            case REPORTING_TASK:
+                addReportingTask(flowManager.getReportingTaskNode(componentId));
+                break;
+        }
+    }
+
+    /**
+     * Returns a new AffectedComponentSet that represents only those components that are currently active. A component is considered active if it is an Input/Output Port
+     * and is running, is a Processor or reporting task that has at least one active thread or a scheduled state of RUNNING or STARTING, or is a Controller Service that is
+     * ENABLED or ENABLING.
+     *
+     * @return an AffectedComponentSet that represents all components within this AffectedComponentSet that are currently active. The components contained by the returned AffectedComponentSet
+     * will always be a subset or equal to the set of components contained by this.
+     */
+    public AffectedComponentSet toActiveSet() {
+        final AffectedComponentSet active = new AffectedComponentSet(flowController);
+        inputPorts.stream().filter(port -> port.getScheduledState() == ScheduledState.RUNNING).forEach(active::addInputPort);
+        outputPorts.stream().filter(port -> port.getScheduledState() == ScheduledState.RUNNING).forEach(active::addOutputPort);
+        remoteInputPorts.stream().filter(port -> port.getScheduledState() == ScheduledState.RUNNING).forEach(active::addRemoteInputPort);
+        remoteOutputPorts.stream().filter(port -> port.getScheduledState() == ScheduledState.RUNNING).forEach(active::addRemoteOutputPort);
+
+        processors.stream().filter(this::isActive).forEach(active::addProcessor);
+        reportingTasks.stream().filter(task -> task.getScheduledState() == ScheduledState.STARTING || task.getScheduledState() == ScheduledState.RUNNING || task.isRunning())
+            .forEach(active::addReportingTask);
+        controllerServices.stream().filter(service -> service.getState() == ControllerServiceState.ENABLING || service.getState() == ControllerServiceState.ENABLED)
+            .forEach(active::addControllerServiceWithoutReferences);
+
+        return active;
+    }
+
+    private boolean isActive(final ProcessorNode processor) {
+        // We consider component active if it's starting, running, or has active threads. The call to ProcessorNode.isRunning() will only return true if it has active threads or a scheduled
+        // state of RUNNING but not if it has a scheduled state of STARTING.
+        final ScheduledState scheduledState = processor.getPhysicalScheduledState();
+        return scheduledState == ScheduledState.STARTING || scheduledState == ScheduledState.RUNNING || processor.isRunning();
+    }
+
+    private boolean isStopped(final ProcessorNode processor) {
+        final ScheduledState state = processor.getPhysicalScheduledState();
+        final boolean stateCorrect = state == ScheduledState.STOPPED || state == ScheduledState.DISABLED;
+        return stateCorrect && !processor.isRunning();
+    }
+
+    public void start() {
+        logger.info("Starting the following components: {}", this);
+        flowController.getControllerServiceProvider().enableControllerServices(controllerServices);
+
+        inputPorts.forEach(port -> port.getProcessGroup().startInputPort(port));
+        outputPorts.forEach(port -> port.getProcessGroup().startOutputPort(port));
+        remoteInputPorts.forEach(port -> port.getRemoteProcessGroup().startTransmitting(port));
+        remoteOutputPorts.forEach(port -> port.getRemoteProcessGroup().startTransmitting(port));
+        processors.forEach(processor -> processor.getProcessGroup().startProcessor(processor, false));
+        reportingTasks.forEach(flowController::startReportingTask);
+    }
+
+    /**
+     * Returns a new AffectedComponentSet that represents only those components that currently exist within the NiFi instance. When a set of dataflow updates have occurred, it is very possible
+     * that one or more components referred to by the AffectedComponentSet no longer exist (for example, there was a dataflow update that removed a Processor, so that Processor no longer exists).
+     *
+     * @return an AffectedComponentSet that represents all components within this AffectedComponentSet that currently exist within the NiFi instance. The components contained by the returned
+     * AffectedComponentSetwill always be a subset or equal to the set of components contained by this.
+     */
+    public AffectedComponentSet toExistingSet() {
+        final ControllerServiceProvider serviceProvider = flowController.getControllerServiceProvider();
+
+        final AffectedComponentSet existing = new AffectedComponentSet(flowController);
+        inputPorts.stream().filter(port -> port.getProcessGroup().getInputPort(port.getIdentifier()) != null).forEach(existing::addInputPort);
+        outputPorts.stream().filter(port -> port.getProcessGroup().getOutputPort(port.getIdentifier()) != null).forEach(existing::addOutputPort);
+        remoteInputPorts.stream().filter(port -> port.getProcessGroup().findRemoteGroupPort(port.getIdentifier()) != null).forEach(existing::addRemoteInputPort);
+        remoteOutputPorts.stream().filter(port -> port.getProcessGroup().findRemoteGroupPort(port.getIdentifier()) != null).forEach(existing::addRemoteOutputPort);
+        processors.stream().filter(processor -> processor.getProcessGroup().getProcessor(processor.getIdentifier()) != null).forEach(existing::addProcessor);
+        reportingTasks.stream().filter(task -> flowController.getReportingTaskNode(task.getIdentifier()) != null).forEach(existing::addReportingTask);
+        controllerServices.stream().filter(service -> serviceProvider.getControllerServiceNode(service.getIdentifier()) != null).forEach(existing::addControllerServiceWithoutReferences);
+
+        return existing;
+    }
+
+
+    /**
+     * Returns a new AffectedComponentSet that represents only those components that currently can be started. When a set of dataflow updates have occurred, it is very possible
+     * that one or more components referred to by the AffectedComponentSet can no longer be started (for example, there was a dataflow update that disabled a Processor that previously was running).
+     *
+     * @return an AffectedComponentSet that represents all components within this AffectedComponentSet that currently exist within the NiFi instance. The components contained by the returned
+     * AffectedComponentSetwill always be a subset or equal to the set of components contained by this.

Review comment:
       AffectedComponentSet will




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org