You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2022/04/21 14:44:24 UTC

[nifi] branch main updated: NIFI-9940: Renamed StandardProcessGroupSynchronizer to StandardVersionedComponentSynchronizer. Added synching for processors, input/output ports, connections, etc. Added unit tests.

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a3c7d1fbd NIFI-9940: Renamed StandardProcessGroupSynchronizer to StandardVersionedComponentSynchronizer. Added synching for processors, input/output ports, connections, etc. Added unit tests.
5a3c7d1fbd is described below

commit 5a3c7d1fbd78e78c79d206e34b6f30e25b108faf
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Mar 3 15:41:59 2022 -0500

    NIFI-9940: Renamed StandardProcessGroupSynchronizer to StandardVersionedComponentSynchronizer. Added synching for processors, input/output ports, connections, etc. Added unit tests.
    
    This closes #5983
---
 .../nifi/controller/StandardProcessorNode.java     |   24 +-
 .../service/StandardControllerServiceProvider.java |   45 +-
 .../FlowSynchronizationException.java              |   32 +
 .../StandardVersionedComponentSynchronizer.java}   | 1445 ++++++++++++++++++--
 .../VersionedComponentSynchronizer.java            |  201 +++
 .../VersionedFlowSynchronizationContext.java}      |   12 +-
 .../nifi/groups/DefaultComponentScheduler.java     |    5 +
 .../nifi/groups/ProcessGroupSynchronizer.java      |   36 -
 .../RetainExistingStateComponentScheduler.java     |    6 +
 .../apache/nifi/groups/StandardProcessGroup.java   |   20 +-
 .../nifi/remote/StandardRemoteProcessGroup.java    |    2 +-
 ...StandardVersionedComponentSynchronizerTest.java | 1025 ++++++++++++++
 .../apache/nifi/controller/ProcessScheduler.java   |   16 +-
 .../apache/nifi/controller/ReportingTaskNode.java  |    7 +
 .../service/ControllerServiceProvider.java         |   10 +-
 .../nifi/groups/AbstractComponentScheduler.java    |   26 +-
 .../org/apache/nifi/groups/ComponentScheduler.java |    6 +
 ...ptions.java => FlowSynchronizationOptions.java} |   61 +-
 .../java/org/apache/nifi/groups/ProcessGroup.java  |    2 +-
 .../reporting/StandardReportingTaskNode.java       |   25 +-
 .../scheduling/StandardProcessScheduler.java       |    8 +-
 .../serialization/VersionedFlowSynchronizer.java   |    8 +-
 .../controller/service/mock/MockProcessGroup.java  |    4 +-
 .../web/dao/impl/StandardControllerServiceDAO.java |    2 +-
 .../reporting/StatelessReportingTaskNode.java      |   20 +
 .../scheduling/StatelessProcessScheduler.java      |    3 +-
 26 files changed, 2842 insertions(+), 209 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index f743ac530b..de860a7b95 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -185,12 +185,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
         this.processorRef = new AtomicReference<>(processorDetails);
 
         identifier = new AtomicReference<>(uuid);
-        destinations = new HashMap<>();
-        connections = new HashMap<>();
+        destinations = new ConcurrentHashMap<>();
+        connections = new ConcurrentHashMap<>();
         incomingConnections = new AtomicReference<>(new ArrayList<>());
         lossTolerant = new AtomicBoolean(false);
-        final Set<Relationship> emptySetOfRelationships = new HashSet<>();
-        undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships);
+        undefinedRelationshipsToTerminate = new AtomicReference<>(Collections.emptySet());
         comments = new AtomicReference<>("");
         schedulingPeriod = new AtomicReference<>("0 sec");
         schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS);
@@ -405,11 +404,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
 
     @Override
     public boolean isAutoTerminated(final Relationship relationship) {
-        if (relationship.isAutoTerminated() && getConnections(relationship).isEmpty()) {
-            return true;
-        }
-        final Set<Relationship> terminatable = undefinedRelationshipsToTerminate.get();
-        return terminatable == null ? false : terminatable.contains(relationship);
+        final boolean markedAutoTerminate = relationship.isAutoTerminated() || undefinedRelationshipsToTerminate.get().contains(relationship);
+        return markedAutoTerminate && getConnections(relationship).isEmpty();
     }
 
     @Override
@@ -418,13 +414,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
             throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
         }
 
-        for (final Relationship rel : terminate) {
-            if (!getConnections(rel).isEmpty()) {
-                throw new IllegalStateException("Cannot mark relationship '" + rel.getName()
-                        + "' as auto-terminated because Connection already exists with this relationship");
-            }
-        }
-
         undefinedRelationshipsToTerminate.set(new HashSet<>(terminate));
         LOG.debug("Resetting Validation State of {} due to setting auto-terminated relationships", this);
         resetValidationState();
@@ -726,8 +715,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
     @Override
     public Set<Connection> getConnections(final Relationship relationship) {
         final Set<Connection> applicableConnections = connections.get(relationship);
-        return (applicableConnections == null) ? Collections.<Connection> emptySet()
-                : Collections.unmodifiableSet(applicableConnections);
+        return (applicableConnections == null) ? Collections.emptySet() : Collections.unmodifiableSet(applicableConnections);
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index a8e032a73a..3e7cfdf5aa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -25,9 +25,12 @@ import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.groups.ComponentScheduler;
+import org.apache.nifi.groups.DefaultComponentScheduler;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.Severity;
 import org.slf4j.Logger;
@@ -36,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -96,37 +100,54 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
     @Override
     public Set<ComponentNode> scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+        return scheduleReferencingComponents(serviceNode, null, new DefaultComponentScheduler(this, VersionedComponentStateLookup.IDENTITY_LOOKUP));
+    }
+
+    public Set<ComponentNode> scheduleReferencingComponents(final ControllerServiceNode serviceNode, final Set<ComponentNode> candidates, final ComponentScheduler componentScheduler) {
         // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
         // or a service that references this controller service, etc.
         final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
         final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class);
 
-        final Set<ComponentNode> updated = new HashSet<>();
-
         // verify that  we can start all components (that are not disabled) before doing anything
         for (final ProcessorNode node : processors) {
+            if (candidates != null && !candidates.contains(node)) {
+                continue;
+            }
+
             if (node.getScheduledState() != ScheduledState.DISABLED) {
                 node.verifyCanStart();
-                updated.add(node);
             }
         }
         for (final ReportingTaskNode node : reportingTasks) {
+            if (candidates != null && !candidates.contains(node)) {
+                continue;
+            }
+
             if (node.getScheduledState() != ScheduledState.DISABLED) {
                 node.verifyCanStart();
-                updated.add(node);
             }
         }
 
         // start all of the components that are not disabled
+        final Set<ComponentNode> updated = new HashSet<>();
         for (final ProcessorNode node : processors) {
+            if (candidates != null && !candidates.contains(node)) {
+                continue;
+            }
+
             if (node.getScheduledState() != ScheduledState.DISABLED) {
-                node.getProcessGroup().startProcessor(node, true);
+                componentScheduler.startComponent(node);
                 updated.add(node);
             }
         }
         for (final ReportingTaskNode node : reportingTasks) {
+            if (candidates != null && !candidates.contains(node)) {
+                continue;
+            }
+
             if (node.getScheduledState() != ScheduledState.DISABLED) {
-                processScheduler.schedule(node);
+                componentScheduler.startReportingTask(node);
                 updated.add(node);
             }
         }
@@ -135,13 +156,13 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     }
 
     @Override
-    public Set<ComponentNode> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+    public Map<ComponentNode, Future<Void>> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
         // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
         // or a service that references this controller service, etc.
         final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
         final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class);
 
-        final Set<ComponentNode> updated = new HashSet<>();
+        final Map<ComponentNode, Future<Void>> updated = new HashMap<>();
 
         // verify that  we can stop all components (that are running) before doing anything
         for (final ProcessorNode node : processors) {
@@ -158,14 +179,14 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
         // stop all of the components that are running
         for (final ProcessorNode node : processors) {
             if (node.getScheduledState() == ScheduledState.RUNNING) {
-                node.getProcessGroup().stopProcessor(node);
-                updated.add(node);
+                final Future<Void> future = node.getProcessGroup().stopProcessor(node);
+                updated.put(node, future);
             }
         }
         for (final ReportingTaskNode node : reportingTasks) {
             if (node.getScheduledState() == ScheduledState.RUNNING) {
-                processScheduler.unschedule(node);
-                updated.add(node);
+                final Future<Void> future = processScheduler.unschedule(node);
+                updated.put(node, future);
             }
         }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/FlowSynchronizationException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/FlowSynchronizationException.java
new file mode 100644
index 0000000000..974d15b1fe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/FlowSynchronizationException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.flow.synchronization;
+
+public class FlowSynchronizationException extends Exception {
+    public FlowSynchronizationException(final String message) {
+        super(message);
+    }
+
+    public FlowSynchronizationException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+    public FlowSynchronizationException(final Throwable cause) {
+        super(cause);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
similarity index 63%
rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 8c5592dc18..d498d5fe38 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.groups;
+package org.apache.nifi.flow.synchronization;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.bundle.BundleCoordinate;
@@ -31,6 +31,7 @@ import org.apache.nifi.controller.BackoffMechanism;
 import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.PropertyConfiguration;
+import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.Template;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
@@ -39,6 +40,7 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.LoadBalanceCompression;
 import org.apache.nifi.controller.queue.LoadBalanceStrategy;
 import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.encrypt.EncryptionException;
 import org.apache.nifi.flow.BatchSize;
@@ -52,19 +54,33 @@ import org.apache.nifi.flow.VersionedExternalFlow;
 import org.apache.nifi.flow.VersionedFlowCoordinates;
 import org.apache.nifi.flow.VersionedFunnel;
 import org.apache.nifi.flow.VersionedLabel;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
 import org.apache.nifi.flow.VersionedPort;
 import org.apache.nifi.flow.VersionedProcessGroup;
 import org.apache.nifi.flow.VersionedProcessor;
-import org.apache.nifi.flow.VersionedPropertyDescriptor;
 import org.apache.nifi.flow.VersionedRemoteGroupPort;
 import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.groups.ComponentIdGenerator;
+import org.apache.nifi.groups.FlowFileConcurrency;
+import org.apache.nifi.groups.FlowFileOutboundPolicy;
+import org.apache.nifi.groups.FlowSynchronizationOptions;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.PropertyDecryptor;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
+import org.apache.nifi.groups.StandardVersionedFlowStatus;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.parameter.Parameter;
 import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterContextManager;
 import org.apache.nifi.parameter.ParameterDescriptor;
+import org.apache.nifi.parameter.ParameterReferenceManager;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.registry.VariableDescriptor;
 import org.apache.nifi.registry.client.NiFiRegistryException;
 import org.apache.nifi.registry.flow.FlowRegistry;
@@ -72,8 +88,6 @@ import org.apache.nifi.registry.flow.StandardVersionControlInformation;
 import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
 import org.apache.nifi.registry.flow.VersionedFlowState;
-import org.apache.nifi.flow.VersionedParameter;
-import org.apache.nifi.flow.VersionedParameterContext;
 import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
 import org.apache.nifi.registry.flow.diff.DifferenceType;
 import org.apache.nifi.registry.flow.diff.FlowComparator;
@@ -97,6 +111,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URL;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -107,25 +122,29 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
+import java.util.function.BooleanSupplier;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronizer {
-    private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroupSynchronizer.class);
+public class StandardVersionedComponentSynchronizer implements VersionedComponentSynchronizer {
+    private static final Logger LOG = LoggerFactory.getLogger(StandardVersionedComponentSynchronizer.class);
     private static final String TEMP_FUNNEL_ID_SUFFIX = "-temp-funnel";
     public static final String ENC_PREFIX = "enc{";
     public static final String ENC_SUFFIX = "}";
 
-    private final ProcessGroupSynchronizationContext context;
+    private final VersionedFlowSynchronizationContext context;
     private final Set<String> updatedVersionedComponentIds = new HashSet<>();
 
     private Set<String> preExistingVariables = new HashSet<>();
-    private GroupSynchronizationOptions syncOptions;
+    private FlowSynchronizationOptions syncOptions;
 
-    public StandardProcessGroupSynchronizer(final ProcessGroupSynchronizationContext context) {
+    public StandardVersionedComponentSynchronizer(final VersionedFlowSynchronizationContext context) {
         this.context = context;
     }
 
@@ -138,13 +157,12 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         this.updatedVersionedComponentIds.addAll(updatedVersionedComponentIds);
     }
 
-    public void setSynchronizationOptions(final GroupSynchronizationOptions syncOptions) {
+    public void setSynchronizationOptions(final FlowSynchronizationOptions syncOptions) {
         this.syncOptions = syncOptions;
     }
 
     @Override
-    public void synchronize(final ProcessGroup group, final VersionedExternalFlow versionedExternalFlow, final GroupSynchronizationOptions options) {
-
+    public void synchronize(final ProcessGroup group, final VersionedExternalFlow versionedExternalFlow, final FlowSynchronizationOptions options) {
         final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(context.getExtensionManager(), context.getFlowMappingOptions());
         final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(group, context.getControllerServiceProvider(), context.getFlowRegistryClient(), true);
 
@@ -228,7 +246,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
     }
 
     private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts)
-                    throws ProcessorInstantiationException {
+        throws ProcessorInstantiationException {
 
         // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we
         // transition the service into the RUNNING state, and then we need to update a Connection that is connected to it,
@@ -322,9 +340,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         // 10. Update connections to match those in the proposed group
         // 11. Delete the temporary destination that was created above
 
-        // Keep track of any processors that have been updated to have auto-terminated relationships so that we can set those
-        // auto-terminated relationships after we've handled creating/deleting necessary connections.
-        final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships = new HashMap<>();
 
         // During the flow update, we will use temporary names for process group ports. This is because port names must be
         // unique within a process group, but during an update we might temporarily be in a state where two ports have the same name.
@@ -378,7 +393,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
             synchronizeInputPorts(group, proposed, proposedPortFinalNames, inputPortsByVersionedId);
             synchronizeOutputPorts(group, proposed, proposedPortFinalNames, outputPortsByVersionedId);
             synchronizeLabels(group, proposed, labelsByVersionedId);
-            synchronizeProcessors(group, proposed, autoTerminatedRelationships, processorsByVersionedId);
+            synchronizeProcessors(group, proposed, processorsByVersionedId);
             synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
         } finally {
             // Make sure that we reset the connections
@@ -389,12 +404,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         // We can now add in any necessary connections, since all connectable components have now been created.
         synchronizeConnections(group, proposed, connectionsByVersionedId);
 
-        // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
-        // We cannot do this above, in the 'updateProcessor' call because if a connection is removed and changed to auto-terminated,
-        // then updating this in the updateProcessor call above would attempt to set the Relationship to being auto-terminated while a
-        // Connection for that relationship exists. This will throw an Exception.
-        autoTerminatedRelationships.forEach(ProcessorNode::setAutoTerminatedRelationships);
-
         // All ports have now been added/removed as necessary. We can now resolve the port names.
         updatePortsToFinalNames(proposedPortFinalNames);
 
@@ -428,10 +437,10 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
                 LOG.info("Added {} to {}", added, group);
             } else if (childCoordinates == null || syncOptions.isUpdateDescendantVersionedFlows()) {
 
-                final StandardProcessGroupSynchronizer sync = new StandardProcessGroupSynchronizer(context);
+                final StandardVersionedComponentSynchronizer sync = new StandardVersionedComponentSynchronizer(context);
                 sync.setPreExistingVariables(preExistingVariables);
                 sync.setUpdatedVersionedComponentIds(updatedVersionedComponentIds);
-                final GroupSynchronizationOptions options = GroupSynchronizationOptions.Builder.from(syncOptions)
+                final FlowSynchronizationOptions options = FlowSynchronizationOptions.Builder.from(syncOptions)
                     .updateGroupSettings(true)
                     .build();
 
@@ -456,8 +465,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         for (final VersionedControllerService proposedService : proposed.getControllerServices()) {
             ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier());
             if (service == null) {
-                service = addControllerService(group, proposedService.getIdentifier(), proposedService.getInstanceIdentifier(),
-                    proposedService.getType(), proposedService.getBundle(), context.getComponentIdGenerator());
+                service = addControllerService(group, proposedService, context.getComponentIdGenerator());
 
                 LOG.info("Added {} to {}", service, group);
                 servicesAdded.put(proposedService.getIdentifier(), service);
@@ -520,7 +528,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
             final Connection connection = connectionsByVersionedId.get(removedVersionedId);
             LOG.info("Removing {} from {}", connection, group);
             group.removeConnection(connection);
-            context.getFlowManager().onConnectionRemoved(connection);
         }
     }
 
@@ -634,7 +641,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
                 NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
     }
 
-
     private <T> Map<String, T> componentsById(final ProcessGroup group, final Function<ProcessGroup, Collection<T>> retrieveComponents,
                                               final Function<T, String> retrieveId, final Function<T, Optional<String>> retrieveVersionedComponentId) {
 
@@ -643,7 +649,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
                 NiFiRegistryFlowMapper.generateVersionedComponentId(retrieveId.apply(component))), Function.identity()));
     }
 
-
     private void synchronizeFunnels(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Funnel> funnelsByVersionedId) {
         for (final VersionedFunnel proposedFunnel : proposed.getFunnels()) {
             final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier());
@@ -661,7 +666,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
     }
 
     private void synchronizeInputPorts(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<Port, String> proposedPortFinalNames,
-                                                              final Map<String, Port> inputPortsByVersionedId) {
+                                       final Map<String, Port> inputPortsByVersionedId) {
         for (final VersionedPort proposedPort : proposed.getInputPorts()) {
             final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier());
             if (port == null) {
@@ -803,7 +808,8 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
     }
 
     private <C, V extends VersionedComponent> void removeMissingComponents(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, C> componentsById,
-                                             final Function<VersionedProcessGroup, Collection<V>> getVersionedComponents, final BiConsumer<ProcessGroup, C> removeComponent) {
+                                                                           final Function<VersionedProcessGroup, Collection<V>> getVersionedComponents,
+                                                                           final BiConsumer<ProcessGroup, C> removeComponent) {
 
         // Determine the ID's of the components to remove. To do this, we get the ID's of all components in the Process Group,
         // and then remove from that the ID's of the components in the proposed group. That leaves us with the ID's of components
@@ -821,33 +827,16 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         }
     }
 
+    private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ProcessorNode> processorsByVersionedId)
+                throws ProcessorInstantiationException {
 
-    private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships,
-                                                                       final Map<String, ProcessorNode> processorsByVersionedId) throws ProcessorInstantiationException {
         for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
             final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
             if (processor == null) {
                 final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator());
-                context.getFlowManager().onProcessorAdded(added);
-
-                final Set<Relationship> proposedAutoTerminated =
-                    proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream()
-                        .map(added::getRelationship)
-                        .collect(Collectors.toSet());
-                autoTerminatedRelationships.put(added, proposedAutoTerminated);
                 LOG.info("Added {} to {}", added, group);
             } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
                 updateProcessor(processor, proposedProcessor);
-
-                final Set<Relationship> proposedAutoTerminated =
-                    proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream()
-                        .map(processor::getRelationship)
-                        .collect(Collectors.toSet());
-
-                if (!processor.getAutoTerminatedRelationships().equals(proposedAutoTerminated)) {
-                    autoTerminatedRelationships.put(processor, proposedAutoTerminated);
-                }
-
                 LOG.info("Updated {}", processor);
             } else {
                 processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
@@ -855,6 +844,18 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         }
     }
 
+    private Set<Relationship> getAutoTerminatedRelationships(final ProcessorNode processor, final VersionedProcessor proposedProcessor) {
+        final Set<String> relationshipNames = proposedProcessor.getAutoTerminatedRelationships();
+        if (relationshipNames == null) {
+            return Collections.emptySet();
+        }
+
+        return relationshipNames.stream()
+            .map(processor::getRelationship)
+            .filter(Objects::nonNull)
+            .collect(Collectors.toSet());
+    }
+
     private void synchronizeRemoteGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, RemoteProcessGroup> rpgsByVersionedId) {
         for (final VersionedRemoteProcessGroup proposedRpg : proposed.getRemoteProcessGroups()) {
             final RemoteProcessGroup rpg = rpgsByVersionedId.get(proposedRpg.getIdentifier());
@@ -1013,11 +1014,11 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
 
         destination.addProcessGroup(group);
 
-        final StandardProcessGroupSynchronizer sync = new StandardProcessGroupSynchronizer(context);
+        final StandardVersionedComponentSynchronizer sync = new StandardVersionedComponentSynchronizer(context);
         sync.setPreExistingVariables(variablesToSkip);
         sync.setUpdatedVersionedComponentIds(updatedVersionedComponentIds);
 
-        final GroupSynchronizationOptions options = GroupSynchronizationOptions.Builder.from(syncOptions)
+        final FlowSynchronizationOptions options = FlowSynchronizationOptions.Builder.from(syncOptions)
             .updateGroupSettings(true)
             .build();
         sync.setSynchronizationOptions(options);
@@ -1026,23 +1027,125 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         return group;
     }
 
-    private ControllerServiceNode addControllerService(final ProcessGroup destination, final String versionedId, final String instanceId, final String type, final Bundle bundle,
-                                                                         final ComponentIdGenerator componentIdGenerator) {
-        final String id = componentIdGenerator.generateUuid(versionedId, instanceId, destination.getIdentifier());
-        LOG.debug("Adding Controller Service with ID {} of type {}", id, type);
+    private ControllerServiceNode addControllerService(final ProcessGroup destination, final VersionedControllerService proposed, final ComponentIdGenerator componentIdGenerator) {
+        final String destinationId = destination == null ? "Controller" : destination.getIdentifier();
+        final String identifier = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destinationId);
+        LOG.debug("Adding Controller Service with ID {} of type {}", identifier, proposed.getType());
 
-        final BundleCoordinate coordinate = toCoordinate(bundle);
-        final boolean firstTimeAdded = true;
+        final BundleCoordinate coordinate = toCoordinate(proposed.getBundle());
         final Set<URL> additionalUrls = Collections.emptySet();
+        final ControllerServiceNode newService = context.getFlowManager().createControllerService(proposed.getType(), identifier, coordinate, additionalUrls, true, true, null);
+        newService.setVersionedComponentId(proposed.getIdentifier());
 
-        final ControllerServiceNode newService = context.getFlowManager().createControllerService(type, id, coordinate, additionalUrls, firstTimeAdded, true, null);
-        newService.setVersionedComponentId(versionedId);
-
-        destination.addControllerService(newService);
+        if (destination == null) {
+            context.getFlowManager().addRootControllerService(newService);
+        } else {
+            destination.addControllerService(newService);
+        }
 
         return newService;
     }
 
+    private void verifyCanSynchronize(final ControllerServiceNode controllerService, final VersionedControllerService proposed) {
+        // If service is null, we can always synchronize by creating the proposed service.
+        if (controllerService == null) {
+            return;
+        }
+
+        // Ensure that service is in a state that it can be removed.
+        if (proposed == null) {
+            controllerService.verifyCanDelete();
+            return;
+        }
+
+        // Verify service can be updated
+        controllerService.verifyCanUpdate();
+    }
+
+    @Override
+    public void synchronize(final ControllerServiceNode controllerService, final VersionedControllerService proposed, final ProcessGroup group,
+                            final FlowSynchronizationOptions synchronizationOptions) throws FlowSynchronizationException, TimeoutException, InterruptedException {
+        if (controllerService == null && proposed == null) {
+            return;
+        }
+
+        setSynchronizationOptions(synchronizationOptions);
+
+        final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+        final ControllerServiceProvider serviceProvider = context.getControllerServiceProvider();
+
+        synchronizationOptions.getComponentScheduler().pause();
+        try {
+            // Disable the controller service, if necessary, in order to update it.
+            final Set<ComponentNode> referencesToRestart = new HashSet<>();
+            final Set<ControllerServiceNode> servicesToRestart = new HashSet<>();
+
+            try {
+                stopControllerService(controllerService, proposed, timeout, synchronizationOptions.getComponentStopTimeoutAction(), referencesToRestart, servicesToRestart);
+                verifyCanSynchronize(controllerService, proposed);
+
+                try {
+                    if (proposed == null) {
+                        serviceProvider.removeControllerService(controllerService);
+                        LOG.info("Successfully synchronized {} by removing it from the flow", controllerService);
+                    } else if (controllerService == null) {
+                        final ControllerServiceNode added = addControllerService(group, proposed, synchronizationOptions.getComponentIdGenerator());
+
+                        if (proposed.getScheduledState() == org.apache.nifi.flow.ScheduledState.ENABLED) {
+                            servicesToRestart.add(added);
+                        }
+
+                        LOG.info("Successfully synchronized {} by adding it to the flow", added);
+                    } else {
+                        updateControllerService(controllerService, proposed);
+
+                        if (proposed.getScheduledState() == org.apache.nifi.flow.ScheduledState.ENABLED) {
+                            servicesToRestart.add(controllerService);
+                        }
+
+                        LOG.info("Successfully synchronized {} by updating it to match proposed version", controllerService);
+                    }
+                } catch (final Exception e) {
+                    throw new FlowSynchronizationException("Failed to synchronize Controller Service " + controllerService + " with proposed version", e);
+                }
+            } finally {
+                // Re-enable the controller service if necessary
+                serviceProvider.enableControllerServicesAsync(servicesToRestart);
+
+                // Restart any components that need to be restarted.
+                if (controllerService != null) {
+                    serviceProvider.scheduleReferencingComponents(controllerService, referencesToRestart, context.getComponentScheduler());
+                }
+            }
+        } finally {
+            synchronizationOptions.getComponentScheduler().resume();
+        }
+    }
+
+    private void waitForStopCompletion(final Future<?> future, final Object component, final long timeout, final FlowSynchronizationOptions.ComponentStopTimeoutAction timeoutAction)
+        throws InterruptedException, FlowSynchronizationException, TimeoutException {
+        try {
+            future.get(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException e) {
+            throw new InterruptedException("Interrupted while waiting for " + component + " to stop/disable");
+        } catch (final ExecutionException ee) {
+            throw new FlowSynchronizationException("Failed to stop/disable " + component, ee.getCause());
+        } catch (final TimeoutException e) {
+            // On timeout, if action is to terminate and the component is a processor, terminate it.
+            if (component instanceof ProcessorNode) {
+                switch (timeoutAction) {
+                    case THROW_TIMEOUT_EXCEPTION:
+                        throw e;
+                    case TERMINATE:
+                    default:
+                        ((ProcessorNode) component).terminate();
+                        return;
+                }
+            }
+
+            throw new TimeoutException("Timed out waiting for " + component + " to stop/disable");
+        }
+    }
 
     private void updateControllerService(final ControllerServiceNode service, final VersionedControllerService proposed) {
         LOG.debug("Updating {}", service);
@@ -1053,7 +1156,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
             service.setComments(proposed.getComments());
             service.setName(proposed.getName());
 
-            final Map<String, String> properties = populatePropertiesMap(service, proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup());
+            final Map<String, String> properties = populatePropertiesMap(service, proposed.getProperties(), service.getProcessGroup());
             service.setProperties(properties, true);
 
             if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) {
@@ -1067,8 +1170,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         }
     }
 
-    private Map<String, String> populatePropertiesMap(final ComponentNode componentNode, final Map<String, String> proposedProperties,
-                                                      final Map<String, VersionedPropertyDescriptor> proposedDescriptors, final ProcessGroup group) {
+    private Map<String, String> populatePropertiesMap(final ComponentNode componentNode, final Map<String, String> proposedProperties, final ProcessGroup group) {
 
         // Explicitly set all existing properties to null, except for sensitive properties, so that if there isn't an entry in the proposedProperties
         // it will get removed from the processor. We don't do this for sensitive properties because when we retrieve the VersionedProcessGroup from registry,
@@ -1088,11 +1190,10 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
                 .forEach(updatedPropertyNames::add);
 
             for (final String propertyName : updatedPropertyNames) {
-                final VersionedPropertyDescriptor descriptor = proposedDescriptors.get(propertyName);
+                final PropertyDescriptor descriptor = componentNode.getPropertyDescriptor(propertyName);
 
                 String value;
-                if (descriptor != null && descriptor.getIdentifiesControllerService()) {
-
+                if (descriptor != null && descriptor.getControllerServiceDefinition() != null ) {
                     // Need to determine if the component's property descriptor for this service is already set to an id
                     // of an existing service that is outside the current processor group, and if it is we want to leave
                     // the property set to that value
@@ -1174,6 +1275,434 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         }
     }
 
+    private void verifyCanSynchronize(final ParameterContext parameterContext, final VersionedParameterContext proposed) throws FlowSynchronizationException {
+        // Make sure that we have a unique name and add the Parameter Context if none exists
+        if (parameterContext == null) {
+            final ParameterContext existingContext = getParameterContextByName(proposed.getName());
+            if (existingContext != null) {
+                throw new FlowSynchronizationException("Cannot synchronize flow with proposed Parameter Context because a Parameter Context already exists with the name " + proposed.getName());
+            }
+        }
+
+        // If deleting, must ensure that no other parameter contexts inherit from this one.
+        if (proposed == null) {
+            verifyNotInherited(parameterContext.getIdentifier());
+        }
+
+        if (parameterContext != null && proposed != null) {
+            // Check that the parameters have appropriate sensitivity flag
+            for (final VersionedParameter versionedParameter : proposed.getParameters()) {
+                final Optional<Parameter> optionalParameter = parameterContext.getParameter(versionedParameter.getName());
+                if (optionalParameter.isPresent()) {
+                    final boolean paramSensitive = optionalParameter.get().getDescriptor().isSensitive();
+                    if (paramSensitive != versionedParameter.isSensitive()) {
+                        throw new FlowSynchronizationException("Cannot synchronize flow with proposed Parameter Context because the Parameter [" + versionedParameter.getName() + "] in " +
+                            parameterContext + " has a sensitivity flag of " + paramSensitive + " while the proposed version has a sensitivity flag of " + versionedParameter.isSensitive());
+                    }
+                }
+            }
+
+            // Check that parameter contexts to inherit exist
+            final List<String> inheritedContexts = proposed.getInheritedParameterContexts();
+            if (inheritedContexts != null) {
+                for (final String contextName : inheritedContexts) {
+                    final ParameterContext existing = getParameterContextByName(contextName);
+                    if (existing == null) {
+                        throw new FlowSynchronizationException("Cannot synchronize flow with proposed Parameter Context because proposed version inherits from Parameter Context with name " +
+                            contextName + " but there is no Parameter Context with that name in the current flow");
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void synchronize(final ParameterContext parameterContext, final VersionedParameterContext proposed, final FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException, InterruptedException {
+
+        if (parameterContext == null && proposed == null) {
+            return;
+        }
+
+        final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+        verifyCanSynchronize(parameterContext, proposed);
+
+        synchronizationOptions.getComponentScheduler().pause();
+        try {
+            // Make sure that we have a unique name and add the Parameter Context if none exists
+            if (parameterContext == null) {
+                final String contextId = synchronizationOptions.getComponentIdGenerator().generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), "Controller");
+                final ParameterContext added = createParameterContext(proposed, contextId, Collections.emptyMap());
+                LOG.info("Successfully synchronized {} by adding it to the flow", added);
+                return;
+            }
+
+            final ParameterReferenceManager referenceManager = parameterContext.getParameterReferenceManager();
+            final Set<String> updatedParameterNames = getUpdatedParameterNames(parameterContext, proposed);
+
+            final Set<ComponentNode> componentsToRestart = new HashSet<>();
+            final Set<ControllerServiceNode> servicesToRestart = new HashSet<>();
+            try {
+                // Stop components necessary
+                for (final String paramName : updatedParameterNames) {
+                    final Set<ProcessorNode> processors = referenceManager.getProcessorsReferencing(parameterContext, paramName);
+                    componentsToRestart.addAll(stopOrTerminate(processors, timeout, synchronizationOptions));
+
+                    final Set<ControllerServiceNode> referencingServices = referenceManager.getControllerServicesReferencing(parameterContext, paramName);
+
+                    for (final ControllerServiceNode referencingService : referencingServices) {
+                        stopControllerService(referencingService, null, timeout, synchronizationOptions.getComponentStopTimeoutAction(), componentsToRestart, servicesToRestart);
+                        servicesToRestart.add(referencingService);
+                    }
+                }
+
+                // Remove or update parameter context.
+                final ParameterContextManager contextManager = context.getFlowManager().getParameterContextManager();
+                if (proposed == null) {
+                    for (final ProcessGroup groupBound : referenceManager.getProcessGroupsBound(parameterContext)) {
+                        groupBound.setParameterContext(null);
+                    }
+
+                    contextManager.removeParameterContext(parameterContext.getIdentifier());
+                    LOG.info("Successfully synchronized {} by removing it from the flow", parameterContext);
+                } else {
+                    final Map<String, Parameter> updatedParameters = createParameterMap(proposed.getParameters());
+
+                    final Map<String, ParameterContext> contextsByName = contextManager.getParameterContextNameMapping();
+                    final List<ParameterContext> inheritedContexts = new ArrayList<>();
+                    final List<String> inheritedContextNames = proposed.getInheritedParameterContexts();
+                    if (inheritedContextNames != null) {
+                        for (final String inheritedContextName : inheritedContextNames) {
+                            final ParameterContext inheritedContext = contextsByName.get(inheritedContextName);
+                            inheritedContexts.add(inheritedContext);
+                        }
+                    }
+
+                    parameterContext.setParameters(updatedParameters);
+                    parameterContext.setName(proposed.getName());
+                    parameterContext.setDescription(proposed.getDescription());
+                    parameterContext.setInheritedParameterContexts(inheritedContexts);
+                    LOG.info("Successfully synchronized {} by updating it to match the proposed version", parameterContext);
+                }
+            } finally {
+                // TODO: How will this behave if Controller Service was changed to DISABLING but then timed out waiting for it to disable?
+                //       In that case, I think this will fail to enable the controller services, and as a result it will remain DISABLED.
+                //       We probably want to update the logic here so that it marks a desired state of ENABLED and when the service finally transitions
+                //       to DISABLED we enable it.
+                context.getControllerServiceProvider().enableControllerServicesAsync(servicesToRestart);
+
+                // We don't use ControllerServiceProvider.scheduleReferencingComponents here, as we do when dealing with a Controller Service
+                // because if we timeout while waiting for a Controller Service to stop, then that Controller Service won't be in our list of Controller Services
+                // to re-enable. As a result, we don't have the appropriate Controller Service to pass to the scheduleReferencingComponents.
+                for (final ComponentNode stoppedComponent : componentsToRestart) {
+                    if (stoppedComponent instanceof Connectable) {
+                        context.getComponentScheduler().startComponent((Connectable) stoppedComponent);
+                    }
+                }
+            }
+        } finally {
+            synchronizationOptions.getComponentScheduler().resume();
+        }
+    }
+
+    protected Set<String> getUpdatedParameterNames(final ParameterContext parameterContext, final VersionedParameterContext proposed) {
+        final Map<String, String> originalValues = new HashMap<>();
+        parameterContext.getParameters().values().forEach(param -> originalValues.put(param.getDescriptor().getName(), param.getValue()));
+
+        final Map<String, String> proposedValues = new HashMap<>();
+        if (proposed != null) {
+            proposed.getParameters().forEach(versionedParam -> proposedValues.put(versionedParam.getName(), versionedParam.getValue()));
+        }
+
+        final Map<String, String> copyOfOriginalValues = new HashMap<>(originalValues);
+        proposedValues.forEach(originalValues::remove);
+        copyOfOriginalValues.forEach(proposedValues::remove);
+
+        final Set<String> updatedParameterNames = new HashSet<>(originalValues.keySet());
+        updatedParameterNames.addAll(proposedValues.keySet());
+
+        return updatedParameterNames;
+    }
+
+    @Override
+    public void synchronizeProcessGroupSettings(final ProcessGroup processGroup, final VersionedProcessGroup proposed, final ProcessGroup parentGroup,
+                                                final FlowSynchronizationOptions synchronizationOptions)
+                    throws FlowSynchronizationException, TimeoutException, InterruptedException {
+
+        if (processGroup == null && proposed == null) {
+            return;
+        }
+
+        final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+
+        synchronizationOptions.getComponentScheduler().pause();
+        try {
+            // Check if we need to delete the Process Group
+            if (proposed == null) {
+                // Ensure that there are no incoming connections
+                processGroup.getInputPorts().forEach(Port::verifyCanDelete);
+
+                // Bleed out the data by stopping all input ports and source processors, then waiting
+                // for all connections to become empty
+                bleedOut(processGroup, timeout, synchronizationOptions);
+
+                processGroup.stopProcessing();
+                waitFor(timeout, () -> isDoneProcessing(processGroup));
+
+                // Disable all Controller Services
+                final Future<Void> disableServicesFuture = context.getControllerServiceProvider().disableControllerServicesAsync(processGroup.findAllControllerServices());
+                try {
+                    disableServicesFuture.get(timeout, TimeUnit.MILLISECONDS);
+                } catch (final ExecutionException ee) {
+                    throw new FlowSynchronizationException("Could not synchronize flow with proposal due to: failed to disable Controller Services", ee.getCause());
+                }
+
+                // Remove all templates from the group and remove the group
+                processGroup.getTemplates().forEach(processGroup::removeTemplate);
+                processGroup.getParent().removeProcessGroup(processGroup);
+
+                LOG.info("Successfully synchronized {} by removing it from the flow", processGroup);
+                return;
+            }
+
+            // Create the Process Group if it doesn't exist
+            final ProcessGroup groupToUpdate;
+            if (processGroup == null) {
+                final String groupId = synchronizationOptions.getComponentIdGenerator().generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), parentGroup.getIdentifier());
+                final ProcessGroup group = context.getFlowManager().createProcessGroup(groupId);
+                group.setVersionedComponentId(proposed.getIdentifier());
+                group.setParent(parentGroup);
+                group.setName(proposed.getName());
+
+                parentGroup.addProcessGroup(group);
+                groupToUpdate = group;
+            } else {
+                groupToUpdate = processGroup;
+            }
+
+            // Ensure that the referenced Parameter Context is valid
+            final ParameterContext parameterContext = proposed.getParameterContextName() == null ? null :
+                context.getFlowManager().getParameterContextManager().getParameterContextNameMapping().get(proposed.getParameterContextName());
+
+            if (parameterContext == null && proposed.getParameterContextName() != null) {
+                throw new FlowSynchronizationException("Cannot synchronize flow with proposed version because proposal indicates that Process Group " + groupToUpdate +
+                    " should use Parameter Context with name [" + proposed.getParameterContextName() + "] but no Parameter Context exists with that name");
+            }
+
+            // Determine which components must be stopped/disabled based on Parameter Context name changing
+            final Set<ProcessorNode> processorsToStop = new HashSet<>();
+            final Set<ControllerServiceNode> controllerServicesToStop = new HashSet<>();
+            final String currentParameterContextName = groupToUpdate.getParameterContext() == null ? null : groupToUpdate.getParameterContext().getName();
+            if (!Objects.equals(currentParameterContextName, proposed.getParameterContextName())) {
+                groupToUpdate.getProcessors().stream()
+                    .filter(ProcessorNode::isRunning)
+                    .filter(ProcessorNode::isReferencingParameter)
+                    .forEach(processorsToStop::add);
+
+                final Set<ControllerServiceNode> servicesReferencingParams = groupToUpdate.getControllerServices(false).stream()
+                    .filter(ControllerServiceNode::isReferencingParameter)
+                    .collect(Collectors.toSet());
+
+                for (final ControllerServiceNode service : servicesReferencingParams) {
+                    if (!service.isActive()) {
+                        continue;
+                    }
+
+                    controllerServicesToStop.add(service);
+
+                    for (final ControllerServiceNode referencingService : service.getReferences().findRecursiveReferences(ControllerServiceNode.class)) {
+                        if (!referencingService.isActive()) {
+                            continue;
+                        }
+
+                        controllerServicesToStop.add(referencingService);
+                    }
+                }
+
+                for (final ControllerServiceNode service : controllerServicesToStop) {
+                    service.getReferences().findRecursiveReferences(ProcessorNode.class).stream()
+                        .filter(ProcessorNode::isRunning)
+                        .forEach(processorsToStop::add);
+                }
+            }
+
+            // Determine which components must be stopped based on changes to Variable Registry.
+            final Set<String> updatedVariableNames = getUpdatedVariableNames(groupToUpdate.getVariableRegistry(), proposed.getVariables() == null ? Collections.emptyMap() : proposed.getVariables());
+            if (!updatedVariableNames.isEmpty()) {
+                for (final String variableName : updatedVariableNames) {
+                    final Set<ComponentNode> affectedComponents = groupToUpdate.getComponentsAffectedByVariable(variableName);
+                    for (final ComponentNode component : affectedComponents) {
+                        if (component instanceof ProcessorNode) {
+                            final ProcessorNode processor = (ProcessorNode) component;
+                            if (processor.isRunning()) {
+                                processorsToStop.add(processor);
+                            }
+                        } else if (component instanceof ControllerServiceNode) {
+                            final ControllerServiceNode service = (ControllerServiceNode) component;
+                            if (service.isActive()) {
+                                controllerServicesToStop.add(service);
+                            }
+                        }
+                    }
+                }
+            }
+
+            try {
+                // Stop all necessary running processors
+                stopOrTerminate(processorsToStop, timeout, synchronizationOptions);
+
+                // Stop all necessary enabled/active Controller Services
+                final Future<Void> serviceDisableFuture = context.getControllerServiceProvider().disableControllerServicesAsync(controllerServicesToStop);
+                try {
+                    serviceDisableFuture.get(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+                } catch (ExecutionException e) {
+                    throw new FlowSynchronizationException("Failed to disable Controller Services necessary in order to perform update of Process Group", e);
+                }
+
+                // Update the Process Group
+                groupToUpdate.setDefaultBackPressureDataSizeThreshold(proposed.getDefaultBackPressureDataSizeThreshold());
+                groupToUpdate.setDefaultBackPressureObjectThreshold(proposed.getDefaultBackPressureObjectThreshold());
+                groupToUpdate.setDefaultFlowFileExpiration(proposed.getDefaultFlowFileExpiration());
+                groupToUpdate.setFlowFileConcurrency(proposed.getFlowFileConcurrency() == null ? FlowFileConcurrency.UNBOUNDED : FlowFileConcurrency.valueOf(proposed.getFlowFileConcurrency()));
+                groupToUpdate.setFlowFileOutboundPolicy(proposed.getFlowFileOutboundPolicy() == null ? FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE :
+                    FlowFileOutboundPolicy.valueOf(proposed.getFlowFileOutboundPolicy()));
+                groupToUpdate.setParameterContext(parameterContext);
+                groupToUpdate.setVariables(proposed.getVariables());
+                groupToUpdate.setComments(proposed.getComments());
+                groupToUpdate.setName(proposed.getName());
+                groupToUpdate.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
+
+                if (processGroup == null) {
+                    LOG.info("Successfully synchronized {} by adding it to the flow", groupToUpdate);
+                } else {
+                    LOG.info("Successfully synchronized {} by updating it to match proposed version", groupToUpdate);
+                }
+            } finally {
+                // Re-enable all Controller Services that we disabled and restart all processors
+                context.getControllerServiceProvider().enableControllerServicesAsync(controllerServicesToStop);
+
+                for (final ProcessorNode processor : processorsToStop) {
+                    processor.getProcessGroup().startProcessor(processor, false);
+                }
+            }
+        } finally {
+            synchronizationOptions.getComponentScheduler().resume();
+        }
+    }
+
+    private boolean isDoneProcessing(final ProcessGroup group) {
+        for (final ProcessorNode processor : group.getProcessors()) {
+            if (processor.isRunning()) {
+                return false;
+            }
+        }
+
+        for (final Port port : group.getInputPorts()) {
+            if (port.isRunning()) {
+                return false;
+            }
+        }
+
+        for (final Port port : group.getOutputPorts()) {
+            if (port.isRunning()) {
+                return false;
+            }
+        }
+
+        for (final RemoteProcessGroup rpg : group.getRemoteProcessGroups()) {
+            for (final RemoteGroupPort port : rpg.getInputPorts()) {
+                if (port.isRunning()) {
+                    return false;
+                }
+            }
+
+            for (final RemoteGroupPort port : rpg.getOutputPorts()) {
+                if (port.isRunning()) {
+                    return false;
+                }
+            }
+        }
+
+        for (final ProcessGroup childGroup : group.getProcessGroups()) {
+            if (!isDoneProcessing(childGroup)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    private void bleedOut(final ProcessGroup processGroup, final long timeout, final FlowSynchronizationOptions synchronizationOptions)
+                throws FlowSynchronizationException, TimeoutException, InterruptedException {
+        processGroup.getInputPorts().forEach(processGroup::stopInputPort);
+
+        final Set<ProcessorNode> sourceProcessors = processGroup.findAllProcessors().stream()
+            .filter(this::isSourceProcessor)
+            .collect(Collectors.toSet());
+
+        stopOrTerminate(sourceProcessors, timeout, synchronizationOptions);
+
+        final List<Connection> connections = processGroup.findAllConnections();
+        waitFor(timeout, () -> connectionsEmpty(connections));
+    }
+
+    private void waitFor(final long timeout, final BooleanSupplier condition) throws InterruptedException {
+        while (System.currentTimeMillis() <= timeout && !condition.getAsBoolean()) {
+            Thread.sleep(10L);
+        }
+    }
+
+    private boolean connectionsEmpty(final Collection<Connection> connections) {
+        for (final Connection connection : connections) {
+            if (!connection.getFlowFileQueue().isEmpty()) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    private boolean isSourceProcessor(final ProcessorNode processor) {
+        return processor.getIncomingConnections().stream()
+            .anyMatch(connection -> connection.getSource() != processor);
+    }
+
+    private Set<String> getUpdatedVariableNames(final ComponentVariableRegistry variableRegistry, final Map<String, String> updatedVariables) {
+        final Set<String> updatedVariableNames = new HashSet<>();
+
+        final Map<String, String> currentVariables = new HashMap<>();
+        variableRegistry.getVariableMap().forEach((key, value) -> currentVariables.put(key.getName(), value));
+
+        // If there's any value in the updated variables that differs from the current variables, add the variable name to our Set
+        for (final Map.Entry<String, String> entry : updatedVariables.entrySet()) {
+            final String key = entry.getKey();
+            final String updatedValue = entry.getValue();
+            final String currentValue = currentVariables.get(key);
+
+            if (!Objects.equals(currentValue, updatedValue)) {
+                updatedVariableNames.add(key);
+            }
+        }
+
+        // For any variable that currently exists but doesn't exist in the updated variables, add it to our Set
+        for (final String key : currentVariables.keySet()) {
+            if (!updatedVariables.containsKey(key)) {
+                updatedVariableNames.add(key);
+            }
+        }
+
+        return updatedVariableNames;
+    }
+
+
+    private void verifyNotInherited(final String parameterContextId) {
+        for (final ParameterContext parameterContext : context.getFlowManager().getParameterContextManager().getParameterContexts()) {
+            if (parameterContext.getInheritedParameterContexts().stream().anyMatch(pc -> pc.getIdentifier().equals(parameterContextId))) {
+                throw new IllegalStateException(String.format("Cannot delete Parameter Context with ID [%s] because it is referenced by at least one Parameter Context [%s]",
+                    parameterContextId, parameterContext.getIdentifier()));
+            }
+        }
+    }
 
     private void updateParameterContext(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
                                         final ComponentIdGenerator componentIdGenerator) {
@@ -1255,10 +1784,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
     }
 
     private ParameterContext getParameterContextByName(final String contextName) {
-        return context.getFlowManager().getParameterContextManager().getParameterContexts().stream()
-            .filter(context -> context.getName().equals(contextName))
-            .findAny()
-            .orElse(null);
+        return context.getFlowManager().getParameterContextManager().getParameterContextNameMapping().get(contextName);
     }
 
     private ParameterContext createParameterContextWithoutReferences(final VersionedParameterContext versionedParameterContext) {
@@ -1287,8 +1813,28 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
 
     private ParameterContext createParameterContext(final VersionedParameterContext versionedParameterContext, final String parameterContextId,
                                                     final Map<String, VersionedParameterContext> versionedParameterContexts) {
+
+        final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext.getParameters());
+
+        final List<String> parameterContextRefs = new ArrayList<>();
+        if (versionedParameterContext.getInheritedParameterContexts() != null) {
+            versionedParameterContext.getInheritedParameterContexts().stream()
+                .map(name -> createParameterReferenceId(name, versionedParameterContexts))
+                .forEach(parameterContextRefs::add);
+        }
+
+        final AtomicReference<ParameterContext> contextReference = new AtomicReference<>();
+        context.getFlowManager().withParameterContextResolution(() -> {
+            final ParameterContext created = context.getFlowManager().createParameterContext(parameterContextId, versionedParameterContext.getName(), parameters, parameterContextRefs);
+            contextReference.set(created);
+        });
+
+        return contextReference.get();
+    }
+
+    private Map<String, Parameter> createParameterMap(final Collection<VersionedParameter> versionedParameters) {
         final Map<String, Parameter> parameters = new HashMap<>();
-        for (final VersionedParameter versionedParameter : versionedParameterContext.getParameters()) {
+        for (final VersionedParameter versionedParameter : versionedParameters) {
             final ParameterDescriptor descriptor = new ParameterDescriptor.Builder()
                 .name(versionedParameter.getName())
                 .description(versionedParameter.getDescription())
@@ -1299,14 +1845,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
             parameters.put(versionedParameter.getName(), parameter);
         }
 
-        final List<String> parameterContextRefs = new ArrayList<>();
-        if (versionedParameterContext.getInheritedParameterContexts() != null) {
-            versionedParameterContext.getInheritedParameterContexts().stream()
-                .map(name -> createParameterReferenceId(name, versionedParameterContexts))
-                .forEach(parameterContextRefs::add);
-        }
-
-        return context.getFlowManager().createParameterContext(parameterContextId, versionedParameterContext.getName(), parameters, parameterContextRefs);
+        return parameters;
     }
 
     private String createParameterReferenceId(final String parameterContextName, final Map<String, VersionedParameterContext> versionedParameterContexts) {
@@ -1330,7 +1869,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         return selectedParameterContext;
     }
 
-
     private void addMissingConfiguration(final VersionedParameterContext versionedParameterContext, final ParameterContext currentParameterContext,
                                          final Map<String, VersionedParameterContext> versionedParameterContexts) {
         final Map<String, Parameter> parameters = new HashMap<>();
@@ -1353,7 +1891,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
 
         currentParameterContext.setParameters(parameters);
 
-
         // If the current parameter context doesn't have any inherited param contexts but the versioned one does,
         // add the versioned ones.
         if (versionedParameterContext.getInheritedParameterContexts() != null && !versionedParameterContext.getInheritedParameterContexts().isEmpty()
@@ -1406,12 +1943,66 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         }
     }
 
-    private void updateFunnel(final Funnel funnel, final VersionedFunnel proposed) {
-        funnel.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
-    }
+    @Override
+    public void synchronize(final Funnel funnel, final VersionedFunnel proposed, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException, InterruptedException {
 
-    private Funnel addFunnel(final ProcessGroup destination, final VersionedFunnel proposed, final ComponentIdGenerator componentIdGenerator) {
-        final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier());
+        if (funnel == null && proposed == null) {
+            return;
+        }
+
+        final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+
+        if (proposed == null) {
+            verifyCanDelete(funnel, timeout);
+        } else if (funnel != null) {
+            funnel.verifyCanUpdate();
+        }
+
+        final Set<Connectable> toRestart = new HashSet<>();
+        try {
+            if (proposed == null) {
+                final Set<Connectable> stoppedDownstream = stopDownstreamComponents(funnel, timeout, synchronizationOptions);
+                toRestart.addAll(stoppedDownstream);
+
+                funnel.getProcessGroup().removeFunnel(funnel);
+                LOG.info("Successfully synchronized {} by removing it from the flow", funnel);
+            } else if (funnel == null) {
+                final Funnel added = addFunnel(group, proposed, synchronizationOptions.getComponentIdGenerator());
+                LOG.info("Successfully synchronized {} by adding it to the flow", added);
+            } else {
+                updateFunnel(funnel, proposed);
+                LOG.info("Successfully synchronized {} by updating it to match proposed version", funnel);
+            }
+        } finally {
+            // Restart any components that need to be restarted.
+            for (final Connectable stoppedComponent : toRestart) {
+                context.getComponentScheduler().startComponent(stoppedComponent);
+            }
+        }
+    }
+
+    @Override
+    public void synchronize(final Label label, final VersionedLabel proposed, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions) {
+        if (label == null && proposed == null) {
+            return;
+        }
+
+        if (proposed == null) {
+            label.getProcessGroup().removeLabel(label);
+        } else if (label == null) {
+            addLabel(group, proposed, synchronizationOptions.getComponentIdGenerator());
+        } else {
+            updateLabel(label, proposed);
+        }
+    }
+
+    private void updateFunnel(final Funnel funnel, final VersionedFunnel proposed) {
+        funnel.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
+    }
+
+    private Funnel addFunnel(final ProcessGroup destination, final VersionedFunnel proposed, final ComponentIdGenerator componentIdGenerator) {
+        final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier());
         final Funnel funnel = context.getFlowManager().createFunnel(id);
         funnel.setVersionedComponentId(proposed.getIdentifier());
         destination.addFunnel(funnel);
@@ -1445,6 +2036,82 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         port.setName(name);
     }
 
+    private void verifyCanSynchronize(final Port port, final VersionedPort proposed, final long timeout) throws InterruptedException, TimeoutException, FlowSynchronizationException {
+        if (proposed == null) {
+            verifyCanDelete(port, timeout);
+            return;
+        }
+
+        final ComponentType proposedType = proposed.getComponentType();
+        if (proposedType != ComponentType.INPUT_PORT && proposedType != ComponentType.OUTPUT_PORT) {
+            throw new FlowSynchronizationException("Cannot synchronize port " + port + " with the proposed Port definition because its type is "
+                + proposedType + " and expected either an INPUT_PORT or an OUTPUT_PORT");
+        }
+    }
+
+    @Override
+    public void synchronize(final Port port, final VersionedPort proposed, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException, InterruptedException {
+
+        if (port == null && proposed == null) {
+            return;
+        }
+
+        final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+        verifyCanSynchronize(port, proposed, timeout);
+
+        synchronizationOptions.getComponentScheduler().pause();
+        try {
+            final Set<Connectable> toRestart = new HashSet<>();
+            if (port != null) {
+                final boolean stopped = stopOrTerminate(port, timeout, synchronizationOptions);
+                if (stopped && proposed != null) {
+                    toRestart.add(port);
+                }
+            }
+
+            try {
+                if (port == null) {
+                    final ComponentType proposedType = proposed.getComponentType();
+
+                    if (proposedType == ComponentType.INPUT_PORT) {
+                        addInputPort(group, proposed, synchronizationOptions.getComponentIdGenerator(), proposed.getName());
+                    } else {
+                        addOutputPort(group, proposed, synchronizationOptions.getComponentIdGenerator(), proposed.getName());
+                    }
+
+                    LOG.info("Successfully synchronized {} by adding it to the flow", port);
+                } else if (proposed == null) {
+                    final Set<Connectable> stoppedDownstream = stopDownstreamComponents(port, timeout, synchronizationOptions);
+                    toRestart.addAll(stoppedDownstream);
+
+                    verifyCanDelete(port, timeout);
+
+                    switch (port.getConnectableType()) {
+                        case INPUT_PORT:
+                            port.getProcessGroup().removeInputPort(port);
+                            break;
+                        case OUTPUT_PORT:
+                            port.getProcessGroup().removeOutputPort(port);
+                            break;
+                    }
+
+                    LOG.info("Successfully synchronized {} by removing it from the flow", port);
+                } else {
+                    updatePort(port, proposed, proposed.getName());
+                    LOG.info("Successfully synchronized {} by updating it to match proposed version", port);
+                }
+            } finally {
+                // Restart any components that need to be restarted.
+                for (final Connectable stoppedComponent : toRestart) {
+                    context.getComponentScheduler().startComponent(stoppedComponent);
+                }
+            }
+        } finally {
+            synchronizationOptions.getComponentScheduler().resume();
+        }
+    }
+
     private void updatePort(final Port port, final VersionedPort proposed, final String temporaryName) {
         final String name = temporaryName != null ? temporaryName : proposed.getName();
         port.setComments(proposed.getComments());
@@ -1531,6 +2198,248 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         return procNode;
     }
 
+    private void verifyCanSynchronize(final ProcessorNode processor, final VersionedProcessor proposedProcessor, final long timeout)
+        throws InterruptedException, TimeoutException, FlowSynchronizationException {
+
+        // If processor is null, we can always synchronize by creating the proposed processor.
+        if (processor == null) {
+            return;
+        }
+
+        // Ensure that processor is in a state that it can be removed.
+        if (proposedProcessor == null) {
+            verifyCanDelete(processor, timeout);
+            return;
+        }
+
+        // Verify processor can be updated
+        processor.verifyCanUpdate();
+    }
+
+    private void verifyCanDelete(final Connectable connectable, final long timeout) throws InterruptedException, TimeoutException, FlowSynchronizationException {
+        verifyNoIncomingConnections(connectable);
+        verifyCanDeleteConnections(connectable, timeout);
+        connectable.verifyCanDelete(true);
+    }
+
+    private void verifyCanDeleteConnections(final Connectable connectable, final long timeout) throws InterruptedException, TimeoutException, FlowSynchronizationException {
+        final Set<Connection> connections = connectable.getConnections();
+        for (final Connection connection : connections) {
+            verifyCanDeleteWhenQueueEmpty(connection);
+        }
+
+        for (final Connection connection : connections) {
+            waitForQueueEmpty(connection, Duration.ofMillis(timeout - System.currentTimeMillis()));
+        }
+    }
+
+    private void verifyNoIncomingConnections(final Connectable connectable) throws FlowSynchronizationException {
+        for (final Connection incoming : connectable.getIncomingConnections()) {
+            final Connectable source = incoming.getSource();
+            if (source == connectable) {
+                continue;
+            }
+
+            throw new FlowSynchronizationException("Cannot remove " + connectable + " because it has an incoming connection from " + incoming.getSource());
+        }
+    }
+
+    @Override
+    public void synchronize(final ProcessorNode processor, final VersionedProcessor proposedProcessor, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException, InterruptedException {
+
+        if (processor == null && proposedProcessor == null) {
+            return;
+        }
+
+        setSynchronizationOptions(synchronizationOptions);
+        final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+
+        synchronizationOptions.getComponentScheduler().pause();
+        try {
+            // Stop the processor, if necessary, in order to update it.
+            final Set<Connectable> toRestart = new HashSet<>();
+            if (processor != null) {
+                final boolean stopped = stopOrTerminate(processor, timeout, synchronizationOptions);
+
+                if (stopped && proposedProcessor != null && proposedProcessor.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) {
+                    toRestart.add(processor);
+                }
+            }
+
+            try {
+                verifyCanSynchronize(processor, proposedProcessor, timeout);
+
+                try {
+                    if (proposedProcessor == null) {
+                        final Set<Connectable> stoppedDownstream = stopDownstreamComponents(processor, timeout, synchronizationOptions);
+                        toRestart.addAll(stoppedDownstream);
+
+                        processor.getProcessGroup().removeProcessor(processor);
+                        LOG.info("Successfully synchronized {} by removing it from the flow", processor);
+                    } else if (processor == null) {
+                        final ProcessorNode added = addProcessor(group, proposedProcessor, synchronizationOptions.getComponentIdGenerator());
+                        LOG.info("Successfully synchronized {} by adding it to the flow", added);
+                    } else {
+                        updateProcessor(processor, proposedProcessor);
+                        LOG.info("Successfully synchronized {} by updating it to match proposed version", processor);
+                    }
+                } catch (final Exception e) {
+                    throw new FlowSynchronizationException("Failed to synchronize processor " + processor + " with proposed version", e);
+                }
+            } finally {
+                // Restart any components that need to be restarted.
+                for (final Connectable stoppedComponent : toRestart) {
+                    context.getComponentScheduler().startComponent(stoppedComponent);
+                }
+            }
+        } finally {
+            synchronizationOptions.getComponentScheduler().resume();
+        }
+    }
+
+    private Set<Connectable> stopDownstreamComponents(final Connectable component, final long timeout, final FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException {
+
+        final Set<Connectable> stoppedComponents = new HashSet<>();
+
+        for (final Connection connection : component.getConnections()) {
+            final Connectable destination = connection.getDestination();
+            final boolean stopped = stopOrTerminate(destination, timeout, synchronizationOptions);
+
+            if (stopped) {
+                stoppedComponents.add(destination);
+            }
+        }
+
+        return stoppedComponents;
+    }
+
+    private Set<Connectable> getDownstreamComponents(final Connectable component, final boolean includeSelf) {
+        final Set<Connectable> components = new HashSet<>();
+        if (includeSelf) {
+            components.add(component);
+        }
+
+        for (final Connection connection : component.getConnections()) {
+            components.add(connection.getDestination());
+        }
+
+        return components;
+    }
+
+    private <T extends Connectable> Set<T> stopOrTerminate(final Set<T> components, final long timeout, final FlowSynchronizationOptions synchronizationOptions)
+        throws TimeoutException, FlowSynchronizationException {
+
+        final Set<T> stoppedComponents = new HashSet<>();
+
+        for (final T component : components) {
+            final boolean stopped = stopOrTerminate(component, timeout, synchronizationOptions);
+            if (stopped) {
+                stoppedComponents.add(component);
+            }
+        }
+
+        return stoppedComponents;
+    }
+
+    private boolean stopOrTerminate(final Connectable component, final long timeout, final FlowSynchronizationOptions synchronizationOptions) throws TimeoutException, FlowSynchronizationException {
+        if (!component.isRunning()) {
+            return false;
+        }
+
+        final ConnectableType connectableType = component.getConnectableType();
+        switch (connectableType) {
+            case INPUT_PORT:
+                component.getProcessGroup().stopInputPort((Port) component);
+                return true;
+            case OUTPUT_PORT:
+                component.getProcessGroup().stopOutputPort((Port) component);
+                return true;
+            case PROCESSOR:
+                return stopOrTerminate((ProcessorNode) component, timeout, synchronizationOptions);
+            default:
+                return false;
+        }
+    }
+
+    private boolean stopOrTerminate(final ProcessorNode processor, final long timeout, final FlowSynchronizationOptions synchronizationOptions) throws TimeoutException, FlowSynchronizationException {
+        try {
+            LOG.debug("Stopping {} in order to synchronize it with proposed version", processor);
+            return stopProcessor(processor, timeout);
+        } catch (final TimeoutException te) {
+            switch (synchronizationOptions.getComponentStopTimeoutAction()) {
+                case THROW_TIMEOUT_EXCEPTION:
+                    throw te;
+                case TERMINATE:
+                default:
+                    processor.terminate();
+                    return true;
+            }
+        }
+    }
+
+    private boolean stopProcessor(final ProcessorNode processor, final long timeout) throws FlowSynchronizationException, TimeoutException {
+        if (!processor.isRunning()) {
+            return false;
+        }
+
+        final Future<Void> future = processor.getProcessGroup().stopProcessor(processor);
+        try {
+            future.get(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+            return true;
+        } catch (final ExecutionException ee) {
+            throw new FlowSynchronizationException("Failed to stop processor " + processor, ee.getCause());
+        } catch (final InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new FlowSynchronizationException("Interrupted while waiting for processor " + processor + " to stop", ie);
+        }
+    }
+
+    private void stopControllerService(final ControllerServiceNode controllerService, final VersionedControllerService proposed, final long timeout,
+                                       final FlowSynchronizationOptions.ComponentStopTimeoutAction timeoutAction, final Set<ComponentNode> referencesStopped,
+                                       final Set<ControllerServiceNode> servicesDisabled) throws FlowSynchronizationException,
+        TimeoutException, InterruptedException {
+        final ControllerServiceProvider serviceProvider = context.getControllerServiceProvider();
+        if (controllerService == null) {
+            return;
+        }
+
+        final Map<ComponentNode, Future<Void>> futures = serviceProvider.unscheduleReferencingComponents(controllerService);
+        referencesStopped.addAll(futures.keySet());
+
+        for (final Map.Entry<ComponentNode, Future<Void>> entry : futures.entrySet()) {
+            final ComponentNode component = entry.getKey();
+            final Future<Void> future = entry.getValue();
+
+            waitForStopCompletion(future, component, timeout, timeoutAction);
+        }
+
+        if (controllerService.isActive()) {
+            // If the Controller Service is active, we need to disable it. To do that, we must first disable all referencing services.
+            final List<ControllerServiceNode> referencingServices = controllerService.getReferences().findRecursiveReferences(ControllerServiceNode.class);
+
+            if (proposed != null && proposed.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) {
+                servicesDisabled.add(controllerService);
+            }
+
+            for (final ControllerServiceNode reference : referencingServices) {
+                if (reference.isActive()) {
+                    servicesDisabled.add(reference);
+                }
+            }
+
+            // We want to stop all dependent services plus the controller service we are synchronizing.
+            final Set<ControllerServiceNode> servicesToStop = new HashSet<>(servicesDisabled);
+            servicesToStop.add(controllerService);
+
+            // Disable the service and wait for completion, up to the timeout allowed
+            final Future<Void> future = serviceProvider.disableControllerServicesAsync(servicesToStop);
+            waitForStopCompletion(future, controllerService, timeout, timeoutAction);
+        }
+    }
+
+
     private void updateProcessor(final ProcessorNode processor, final VersionedProcessor proposed) throws ProcessorInstantiationException {
         LOG.debug("Updating Processor {}", processor);
 
@@ -1542,7 +2451,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
             processor.setName(proposed.getName());
             processor.setPenalizationPeriod(proposed.getPenaltyDuration());
 
-            final Map<String, String> properties = populatePropertiesMap(processor, proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup());
+            final Map<String, String> properties = populatePropertiesMap(processor, proposed.getProperties(), processor.getProcessGroup());
             processor.setProperties(properties, true);
             processor.setRunDuration(proposed.getRunDurationMillis(), TimeUnit.MILLISECONDS);
             processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
@@ -1556,6 +2465,16 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
             processor.setMaxBackoffPeriod(proposed.getMaxBackoffPeriod());
             processor.setRetriedRelationships(proposed.getRetriedRelationships());
 
+            final Set<String> proposedAutoTerminated = proposed.getAutoTerminatedRelationships();
+            if (proposedAutoTerminated != null) {
+                final Set<Relationship> relationshipsToAutoTerminate = proposedAutoTerminated.stream()
+                    .map(processor::getRelationship)
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toSet());
+
+                processor.setAutoTerminatedRelationships(relationshipsToAutoTerminate);
+            }
+
             if (proposed.getRetryCount() != null) {
                 processor.setRetryCount(proposed.getRetryCount());
             } else {
@@ -1580,7 +2499,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         }
     }
 
-
     private String getServiceInstanceId(final String serviceVersionedComponentId, final ProcessGroup group) {
         for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) {
             final String versionedId = serviceNode.getVersionedComponentId().orElse(
@@ -1596,7 +2514,110 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         }
 
         return getServiceInstanceId(serviceVersionedComponentId, parent);
+    }
+
+    @Override
+    public void synchronize(final RemoteProcessGroup rpg, final VersionedRemoteProcessGroup proposed, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException, InterruptedException {
+
+        if (rpg == null && proposed == null) {
+            return;
+        }
+
+        setSynchronizationOptions(synchronizationOptions);
+        final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+
+        synchronizationOptions.getComponentScheduler().pause();
+        try {
+            // Stop the processor, if necessary, in order to update it.
+            final Set<Connectable> toRestart = new HashSet<>();
+            if (rpg != null) {
+                if (rpg.isTransmitting()) {
+                    final Set<RemoteGroupPort> transmitting = getTransmittingPorts(rpg);
+
+                    final Future<?> future = rpg.stopTransmitting();
+                    waitForStopCompletion(future, rpg, timeout, synchronizationOptions.getComponentStopTimeoutAction());
+
+                    final boolean proposedTransmitting = isTransmitting(proposed);
+                    if (proposed != null && proposedTransmitting) {
+                        toRestart.addAll(transmitting);
+                    }
+                }
+            }
+
+            try {
+                if (proposed == null) {
+                    // Stop any downstream components so that we can delete the RPG
+                    for (final RemoteGroupPort outPort : rpg.getOutputPorts()) {
+                        final Set<Connectable> stoppedDownstream = stopDownstreamComponents(outPort, timeout, synchronizationOptions);
+                        toRestart.addAll(stoppedDownstream);
+                    }
+
+                    // Verify that we can delete the components
+                    for (final RemoteGroupPort port : rpg.getInputPorts()) {
+                        verifyCanDelete(port, timeout);
+                    }
+                    for (final RemoteGroupPort port : rpg.getOutputPorts()) {
+                        verifyCanDelete(port, timeout);
+                    }
+
+                    rpg.getProcessGroup().removeRemoteProcessGroup(rpg);
+                    LOG.info("Successfully synchronized {} by removing it from the flow", rpg);
+                } else if (rpg == null) {
+                    final RemoteProcessGroup added = addRemoteProcessGroup(group, proposed, synchronizationOptions.getComponentIdGenerator());
+                    LOG.info("Successfully synchronized {} by adding it to the flow", added);
+                } else {
+                    updateRemoteProcessGroup(rpg, proposed, synchronizationOptions.getComponentIdGenerator());
+                    LOG.info("Successfully synchronized {} by updating it to match proposed version", rpg);
+                }
+            } catch (final Exception e) {
+                throw new FlowSynchronizationException("Failed to synchronize " + rpg + " with proposed version", e);
+            } finally {
+                // Restart any components that need to be restarted.
+                for (final Connectable stoppedComponent : toRestart) {
+                    context.getComponentScheduler().startComponent(stoppedComponent);
+                }
+            }
+        } finally {
+            synchronizationOptions.getComponentScheduler().resume();
+        }
+    }
+
+    private boolean isTransmitting(final VersionedRemoteProcessGroup versionedRpg) {
+        if (versionedRpg == null) {
+            return false;
+        }
+
+        for (final VersionedRemoteGroupPort port : versionedRpg.getInputPorts()) {
+            if (port.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) {
+                return true;
+            }
+        }
+
+        for (final VersionedRemoteGroupPort port : versionedRpg.getOutputPorts()) {
+            if (port.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) {
+                return true;
+            }
+        }
 
+        return false;
+    }
+
+    private Set<RemoteGroupPort> getTransmittingPorts(final RemoteProcessGroup rpg) {
+        if (rpg == null) {
+            return Collections.emptySet();
+        }
+
+        final Set<RemoteGroupPort> transmitting = new HashSet<>();
+        rpg.getInputPorts().stream()
+            .filter(port -> port.getScheduledState() == ScheduledState.RUNNING)
+            .forEach(transmitting::add);
+
+        rpg.getOutputPorts().stream()
+            .filter(port -> port.getScheduledState() == ScheduledState.RUNNING)
+            .forEach(transmitting::add);
+
+        return transmitting;
     }
 
     private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final ComponentIdGenerator componentIdGenerator) {
@@ -1607,6 +2628,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         destination.addRemoteProcessGroup(rpg);
         updateRemoteProcessGroup(rpg, proposed, componentIdGenerator);
 
+        rpg.initialize();
         return rpg;
     }
 
@@ -1652,25 +2674,33 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
     }
 
     private RemoteGroupPort getRpgInputPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator) {
-        return getRpgPort(port, rpg, componentIdGenerator, RemoteProcessGroup::getInputPort);
+        return getRpgPort(port, rpg, componentIdGenerator, rpg::getInputPort, rpg.getInputPorts());
     }
 
     private RemoteGroupPort getRpgOutputPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator) {
-        return getRpgPort(port, rpg, componentIdGenerator, RemoteProcessGroup::getOutputPort);
+        return getRpgPort(port, rpg, componentIdGenerator, rpg::getOutputPort, rpg.getOutputPorts());
     }
 
     private RemoteGroupPort getRpgPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator,
-                                       final BiFunction<RemoteProcessGroup, String, RemoteGroupPort> portLookup) {
+                                       final Function<String, RemoteGroupPort> portLookup, final Set<RemoteGroupPort> ports) {
         final String instanceId = port.getInstanceIdentifier();
         if (instanceId != null) {
-            final RemoteGroupPort remoteGroupPort = portLookup.apply(rpg, instanceId);
+            final RemoteGroupPort remoteGroupPort = portLookup.apply(instanceId);
             if (remoteGroupPort != null) {
                 return remoteGroupPort;
             }
         }
 
+        final Optional<RemoteGroupPort> portByName = ports.stream()
+                .filter(p -> p.getName().equals(port.getName()))
+                .findFirst();
+        if (portByName.isPresent()) {
+            return portByName.get();
+        }
+
+
         final String portId = componentIdGenerator.generateUuid(port.getIdentifier(), port.getInstanceIdentifier(), rpg.getIdentifier());
-        final RemoteGroupPort remoteGroupPort = portLookup.apply(rpg, portId);
+        final RemoteGroupPort remoteGroupPort = portLookup.apply(portId);
         return remoteGroupPort;
     }
 
@@ -1711,6 +2741,144 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         return descriptor;
     }
 
+    private void verifyCanSynchronize(final Connection connection, final VersionedConnection proposedConnection) throws FlowSynchronizationException {
+        if (proposedConnection == null) {
+            verifyCanDeleteWhenQueueEmpty(connection);
+        }
+    }
+
+    private void verifyCanDeleteWhenQueueEmpty(final Connection connection) throws FlowSynchronizationException {
+        final boolean empty = connection.getFlowFileQueue().isEmpty();
+        if (empty) {
+            return;
+        }
+
+        final ScheduledState scheduledState = connection.getDestination().getScheduledState();
+        if (scheduledState == ScheduledState.DISABLED || scheduledState == ScheduledState.STOPPED || scheduledState == ScheduledState.STOPPING) {
+            throw new FlowSynchronizationException("Cannot synchronize " + connection + " with proposed connection because doing so would require deleting the connection, " +
+                "and the connection has data queued while the destination is not running. The connection must be emptied before it can be removed.");
+        }
+    }
+
+    private Set<Connectable> getUpstreamComponents(final Connection connection) {
+        if (connection == null) {
+            return Collections.emptySet();
+        }
+
+        final Set<Connectable> components = new HashSet<>();
+        findUpstreamComponents(connection, components);
+        return components;
+    }
+
+    private void findUpstreamComponents(final Connection connection, final Set<Connectable> components) {
+        final Connectable source = connection.getSource();
+        if (source.getConnectableType() == ConnectableType.FUNNEL) {
+            source.getIncomingConnections().forEach(incoming -> findUpstreamComponents(incoming, components));
+        } else {
+            components.add(source);
+        }
+    }
+
+    @Override
+    public void synchronize(final Connection connection, final VersionedConnection proposedConnection, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException {
+
+        if (connection == null && proposedConnection == null) {
+            return;
+        }
+
+        final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+
+        // Stop any upstream components so that we can update the connection
+        final Set<Connectable> upstream = getUpstreamComponents(connection);
+        Set<Connectable> stoppedComponents;
+        try {
+            stoppedComponents = stopOrTerminate(upstream, timeout, synchronizationOptions);
+        } catch (final TimeoutException te) {
+            if (synchronizationOptions.getComponentStopTimeoutAction() == FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION) {
+                throw te;
+            }
+
+            LOG.info("Components upstream of {} did not stop in time. Will terminate {}", connection, upstream);
+            terminateComponents(upstream);
+            stoppedComponents = upstream;
+        }
+
+        try {
+            // Verify that we can synchronize the connection now that the sources are stopped.
+            verifyCanSynchronize(connection, proposedConnection);
+
+            // If the connection is to be deleted, wait for the queue to empty.
+            if (proposedConnection == null) {
+                try {
+                    waitForQueueEmpty(connection, synchronizationOptions.getComponentStopTimeout());
+                } catch (final InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw new FlowSynchronizationException("Interrupted while waiting for FlowFile queue to empty for " + connection, ie);
+                }
+            }
+
+            // Stop destination component so that we can update the connection
+            if (connection != null) {
+                final Connectable destination = connection.getDestination();
+                final boolean stopped = stopOrTerminate(destination, timeout, synchronizationOptions);
+                if (stopped) {
+                    stoppedComponents.add(destination);
+                }
+            }
+
+            if (connection == null) {
+                final Connection added = addConnection(group, proposedConnection, synchronizationOptions.getComponentIdGenerator());
+                LOG.info("Successfully synchronized {} by adding it to the flow", added);
+            } else if (proposedConnection == null) {
+                connection.getProcessGroup().removeConnection(connection);
+                LOG.info("Successfully synchronized {} by removing it from the flow", connection);
+            } else {
+                updateConnection(connection, proposedConnection);
+                LOG.info("Successfully synchronized {} by updating it to match proposed version", connection);
+            }
+        } finally {
+            // If not removing the connection, restart any component that we stopped.
+            if (proposedConnection != null) {
+                for (final Connectable component : stoppedComponents) {
+                    context.getComponentScheduler().startComponent(component);
+                }
+            }
+        }
+    }
+
+    private void waitForQueueEmpty(final Connection connection, final Duration duration) throws TimeoutException, InterruptedException {
+        if (connection == null) {
+            return;
+        }
+
+        final FlowFileQueue flowFileQueue = connection.getFlowFileQueue();
+        final long timeoutMillis = System.currentTimeMillis() + duration.toMillis();
+
+        while (!flowFileQueue.isEmpty()) {
+            if (System.currentTimeMillis() >= timeoutMillis) {
+                throw new TimeoutException("Timed out waiting for " + connection + " to empty its FlowFiles");
+            }
+
+            Thread.sleep(10L);
+        }
+    }
+
+    private void terminateComponents(final Set<Connectable> components) {
+        for (final Connectable component : components) {
+            if (!(component instanceof ProcessorNode)) {
+                continue;
+            }
+
+            final ProcessorNode processor = (ProcessorNode) component;
+            if (!processor.isRunning()) {
+                continue;
+            }
+
+            processor.getProcessGroup().stopProcessor(processor);
+            processor.terminate();
+        }
+    }
 
     private void updateConnection(final Connection connection, final VersionedConnection proposed) {
         LOG.debug("Updating connection from {} to {} with name {} and relationships {}: {}",
@@ -1922,6 +3090,86 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         return null;
     }
 
+    @Override
+    public void synchronize(final ReportingTaskNode reportingTask, final VersionedReportingTask proposed, final FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException, InterruptedException {
+
+        if (reportingTask == null && proposed == null) {
+            return;
+        }
+
+        synchronizationOptions.getComponentScheduler().pause();
+        try {
+            // If reporting task is not null, make sure that it's stopped.
+            if (reportingTask != null && reportingTask.isRunning()) {
+                reportingTask.stop();
+            }
+
+            if (proposed == null) {
+                reportingTask.verifyCanDelete();
+                context.getFlowManager().removeReportingTask(reportingTask);
+                LOG.info("Successfully synchronized {} by removing it from the flow", reportingTask);
+            } else if (reportingTask == null) {
+                final ReportingTaskNode added = addReportingTask(proposed);
+                LOG.info("Successfully synchronized {} by adding it to the flow", added);
+            } else {
+                updateReportingTask(reportingTask, proposed);
+                LOG.info("Successfully synchronized {} by updating it to match proposed version", reportingTask);
+            }
+        } finally {
+            synchronizationOptions.getComponentScheduler().resume();
+        }
+    }
+
+    private ReportingTaskNode addReportingTask(final VersionedReportingTask reportingTask) {
+        final BundleCoordinate coordinate = toCoordinate(reportingTask.getBundle());
+        final ReportingTaskNode taskNode = context.getFlowManager().createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false);
+        updateReportingTask(taskNode, reportingTask);
+        return taskNode;
+    }
+
+    private void updateReportingTask(final ReportingTaskNode reportingTask, final VersionedReportingTask proposed) {
+        LOG.debug("Updating Reporting Task {}", reportingTask);
+
+        reportingTask.pauseValidationTrigger();
+        try {
+            reportingTask.setName(proposed.getName());
+            reportingTask.setComments(proposed.getComments());
+            reportingTask.setSchedulingPeriod(proposed.getSchedulingPeriod());
+            reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
+
+            reportingTask.setAnnotationData(proposed.getAnnotationData());
+            reportingTask.setProperties(proposed.getProperties());
+
+            // enable/disable/start according to the ScheduledState
+            switch (proposed.getScheduledState()) {
+                case DISABLED:
+                    if (reportingTask.isRunning()) {
+                        reportingTask.stop();
+                    }
+                    reportingTask.disable();
+                    break;
+                case ENABLED:
+                    if (reportingTask.getScheduledState() == org.apache.nifi.controller.ScheduledState.DISABLED) {
+                        reportingTask.enable();
+                    } else if (reportingTask.isRunning()) {
+                        reportingTask.stop();
+                    }
+                    break;
+                case RUNNING:
+                    if (reportingTask.getScheduledState() == org.apache.nifi.controller.ScheduledState.DISABLED) {
+                        reportingTask.enable();
+                    }
+                    if (!reportingTask.isRunning()) {
+                        reportingTask.start();
+                    }
+                    break;
+            }
+        } finally {
+            reportingTask.resumeValidationTrigger();
+        }
+    }
+
     private <T extends org.apache.nifi.components.VersionedComponent & Connectable> boolean matchesId(final T component, final String id) {
         return id.equals(component.getIdentifier()) || id.equals(component.getVersionedComponentId().orElse(NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())));
     }
@@ -1931,7 +3179,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
             NiFiRegistryFlowMapper.generateVersionedComponentId(group.getIdentifier())).equals(groupId);
     }
 
-
     private void findAllProcessors(final VersionedProcessGroup group, final Map<String, VersionedProcessor> map) {
         for (final VersionedProcessor processor : group.getProcessors()) {
             map.put(processor.getIdentifier(), processor);
@@ -2026,7 +3273,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         }
     }
 
-
     private Set<String> getKnownVariableNames(final ProcessGroup group) {
         final Set<String> variableNames = new HashSet<>();
         populateKnownVariableNames(group, variableNames);
@@ -2059,4 +3305,5 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
 
         return getVersionedControllerService(group.getParent(), versionedComponentId);
     }
+
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedComponentSynchronizer.java
new file mode 100644
index 0000000000..04b2491cd2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedComponentSynchronizer.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.flow.synchronization;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedLabel;
+import org.apache.nifi.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.groups.FlowSynchronizationOptions;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.parameter.ParameterContext;
+
+import java.util.concurrent.TimeoutException;
+
+public interface VersionedComponentSynchronizer {
+
+    /**
+     * Synchronize the given Process Group to match the proposed flow
+     * @param group the Process Group to update
+     * @param proposedFlow the proposed/desired state for the process group
+     * @param synchronizationOptions options for how to synchronize the group
+     */
+    void synchronize(ProcessGroup group, VersionedExternalFlow proposedFlow, FlowSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException;
+
+    /**
+     * Verifies that the given Process Group can be updated to match the proposed flow
+     * @param group the group to update
+     * @param proposed the proposed updated version
+     * @param verifyConnectionRemoval if <code>true</code> and synchronizing the Process Group would result in any Connection being removed, an IllegalStateException will be thrown if that
+     * Connection has data in it. If <code>false</code>, the presence of data in removed queues will be ignored.
+     */
+    void verifyCanSynchronize(ProcessGroup group, VersionedProcessGroup proposed, boolean verifyConnectionRemoval);
+
+
+    /**
+     * Synchronizes the given Processor to match the proposed snapshot, or deletes the Processor if the proposed snapshot is <code>null</code>. If the given processor is <code>null</code>,
+     * adds the processor to the given ProcessGroup
+     *
+     * @param processor the processor to synchronize
+     * @param proposedProcessor the proposed/desired state for the processor
+     * @param group the ProcessGroup to which the ProcessorNode should belong.
+     * @param synchronizationOptions options for how to synchronize the flow
+     *
+     * @throws FlowSynchronizationException if unable to synchronize the processor with the proposed version
+     * @throws TimeoutException if the processor must be stopped in order to synchronize it with the proposed version and stopping takes longer than the timeout allowed by the
+     * {@link FlowSynchronizationOptions synchronization options}.
+     * @throws InterruptedException if interrupted while waiting for processor to stop or outbound connections to empty if processor is being removed
+     */
+    void synchronize(ProcessorNode processor, VersionedProcessor proposedProcessor, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException, InterruptedException;
+
+    /**
+     * Synchronizes the given Controller Service to match the proposed version, or deletes the Controller Service if the proposed snapshot is <code>null</code>. If the given Controller Service is
+     * <code>null</code>, adds the Controller Service to the given Process Group
+     *
+     * @param controllerService the Controller Service to synchronize
+     * @param proposed the proposed/desired state for the controller service
+     * @param group the ProcessGroup to which the Controller Service should belong
+     * @param synchronizationOptions options for how to synchronize the flow
+     *
+     * @throws FlowSynchronizationException if unable to synchronize the Controller Service with the proposed version
+     * @throws TimeoutException if the Controller Service must be disabled in order to synchronize it with the proposed version and disabling takes longer than the timeout allowed by the
+     * {@link FlowSynchronizationOptions synchronization options}.
+     * @throws InterruptedException if interrupted while waiting for Controller Service to disable
+     */
+    void synchronize(ControllerServiceNode controllerService, VersionedControllerService proposed, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException, InterruptedException;
+
+    /**
+     * Synchronizes the given Connection to match the proposed one, or deletes the Connection if the proposed is <code>null</code>. If the given connection is <code>null</code>, adds the connection
+     * to the given ProcessGroup
+     * @param connection the connection to synchronize
+     * @param proposedConnection the proposed/desired state for the connection
+     * @param group the ProcessGroup to which the connection should belong
+     * @param synchronizationOptions options for how to synchronize the flow
+     *
+     * @throws IllegalStateException if the connection cannot be updated due to the state of the flow
+     * @throws FlowSynchronizationException if unable to synchronize the connection with the proposed version
+     * @throws TimeoutException if the source or destination of the connection must be stopped in order to perform the synchronization and stopping it takes longer than the timeout allowed by the
+     * {@link FlowSynchronizationOptions synchronization options}.
+     */
+    void synchronize(Connection connection, VersionedConnection proposedConnection, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException;
+
+
+    /**
+     * Synchronizes the given Port to match the proposed one, or deletes the Port if the proposed is <code>null</code>. If the given Port is <code>null</code>, creates it and adds it to the given
+     * ProcessGroup
+     *
+     * @param port the port to synchronize
+     * @param proposed the proposed/desired state for the port
+     * @param group the ProcessGroup to which the port should belong
+     * @param synchronizationOptions options for how to synchronize the flow
+     *
+     * @throws IllegalStateException if the port cannot be updated due to the state of the flow
+     * @throws FlowSynchronizationException if unable to synchronize the port with the proposed version
+     * @throws TimeoutException if the port is running and takes longer to stop than the timeout allowed by the {@link FlowSynchronizationOptions synchronization options}.
+     */
+    void synchronize(Port port, VersionedPort proposed, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions) throws FlowSynchronizationException, TimeoutException,
+        InterruptedException;
+
+
+    /**
+     * Synchronizes the given Funnel to match the proposed one, or deletes the Funnel if the proposed is <code>null</code>. If the given Funnel is <code>null</code>, creates it and adds it to the
+     * given ProcessGroup
+     *
+     * @param funnel the funnel to synchronize
+     * @param proposed the proposed/desired state for the funnel
+     * @param group the ProcessGroup to which the funnel should belong
+     * @param synchronizationOptions options for how to synchronize the flow
+     *
+     * @throws IllegalStateException if the funnel cannot be updated due to the state of the flow
+     * @throws FlowSynchronizationException if unable to synchronize the funnel with the proposed version
+     * @throws TimeoutException if the funnel is being removed and downstream components take longer to stop than the timeout allowed by the {@link FlowSynchronizationOptions synchronization options}.
+     */
+    void synchronize(Funnel funnel, VersionedFunnel proposed, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions) throws FlowSynchronizationException, TimeoutException,
+        InterruptedException;
+
+    /**
+     * Synchronizes the given Label to match the proposed one, or deletes the Label if the proposed is <code>null</code>. If the given Label is <code>null</code>, creates it and adds it to the
+     * given ProcessGroup
+     *
+     * @param label the label to synchronize
+     * @param proposed the proposed/desired state for the label
+     * @param group the ProcessGroup to which the label should belong
+     * @param synchronizationOptions options for how to synchronize the flow
+     */
+    void synchronize(Label label, VersionedLabel proposed, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions);
+
+    /**
+     * Synchronizes the given Reporting Task to match the proposed one, or deletes the Reporting Task if the proposed is <code>null</code>. If the given Reporting Task is <code>null</code>,
+     * creates it
+     *
+     * @param reportingTask the reporting task to synchronize
+     * @param proposed the proposed/desired state
+     * @param synchronizationOptions options for how to synchronize the flow
+     *
+     * @throws IllegalStateException if the reporting task cannot be updated due to the state of the flow
+     * @throws FlowSynchronizationException if unable to synchronize the reporting task with the proposed version
+     * @throws TimeoutException if the reporting task is being removed and takes longer to stop than the timeout allowed by the {@link FlowSynchronizationOptions synchronization options}.
+     */
+    void synchronize(ReportingTaskNode reportingTask, VersionedReportingTask proposed, FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException, InterruptedException;
+
+
+    /**
+     * Synchronizes the given Remote Process Group to match the proposed one, or deletes the rpg if the proposed is <code>null</code>. If the given rpg is <code>null</code>, creates it and adds
+     * it to the given ProcessGroup
+     *
+     * @param rpg the rpg to synchronize
+     * @param proposed the proposed/desired state for the rpg
+     * @param group the ProcessGroup to which the rpg should belong
+     * @param synchronizationOptions options for how to synchronize the flow
+     *
+     * @throws IllegalStateException if the rpg cannot be updated due to the state of the flow
+     * @throws FlowSynchronizationException if unable to synchronize the rpg with the proposed version
+     * @throws TimeoutException if the rpg is being removed and takes longer to stop than the timeout allowed by the {@link FlowSynchronizationOptions synchronization options}.
+     */
+    void synchronize(RemoteProcessGroup rpg, VersionedRemoteProcessGroup proposed, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException, InterruptedException;
+
+
+    void synchronize(ParameterContext parameterContext, VersionedParameterContext proposed, FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException, InterruptedException;
+
+    void synchronizeProcessGroupSettings(ProcessGroup processGroup, VersionedProcessGroup proposed, ProcessGroup parentGroup, FlowSynchronizationOptions synchronizationOptions)
+        throws FlowSynchronizationException, TimeoutException, InterruptedException, ProcessorInstantiationException;
+
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedFlowSynchronizationContext.java
similarity index 93%
rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizationContext.java
rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedFlowSynchronizationContext.java
index 4e18031bfd..ebb9ec3903 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedFlowSynchronizationContext.java
@@ -15,12 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.groups;
+package org.apache.nifi.flow.synchronization;
 
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReloadComponent;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.groups.ComponentIdGenerator;
+import org.apache.nifi.groups.ComponentScheduler;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
@@ -30,7 +32,7 @@ import java.util.function.Function;
 
 import static java.util.Objects.requireNonNull;
 
-public class ProcessGroupSynchronizationContext {
+public class VersionedFlowSynchronizationContext {
     private final ComponentIdGenerator componentIdGenerator;
     private final FlowManager flowManager;
     private final FlowRegistryClient flowRegistryClient;
@@ -42,7 +44,7 @@ public class ProcessGroupSynchronizationContext {
     private final Function<ProcessorNode, ProcessContext> processContextFactory;
 
 
-    private ProcessGroupSynchronizationContext(final Builder builder) {
+    private VersionedFlowSynchronizationContext(final Builder builder) {
         this.componentIdGenerator = builder.componentIdGenerator;
         this.flowManager = builder.flowManager;
         this.flowRegistryClient = builder.flowRegistryClient;
@@ -146,7 +148,7 @@ public class ProcessGroupSynchronizationContext {
             return this;
         }
 
-        public ProcessGroupSynchronizationContext build() {
+        public VersionedFlowSynchronizationContext build() {
             requireNonNull(componentIdGenerator, "Component ID Generator must be set");
             requireNonNull(flowManager, "Flow Manager must be set");
             requireNonNull(flowRegistryClient, "Flow Registry Client must be set");
@@ -156,7 +158,7 @@ public class ProcessGroupSynchronizationContext {
             requireNonNull(componentScheduler, "Component Scheduler must be set");
             requireNonNull(flowMappingOptions, "Flow Mapping Options must be set");
             requireNonNull(processContextFactory, "Process Context Factory must be set");
-            return new ProcessGroupSynchronizationContext(this);
+            return new VersionedFlowSynchronizationContext(this);
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java
index a54ec61ab6..32181b7413 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java
@@ -23,6 +23,7 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
+import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.remote.RemoteGroupPort;
 
 import java.util.Collection;
@@ -63,4 +64,8 @@ public class DefaultComponentScheduler extends AbstractComponentScheduler {
     protected void enableNow(final Collection<ControllerServiceNode> controllerServices) {
         getControllerServiceProvider().enableControllerServices(controllerServices);
     }
+
+    protected void startNow(final ReportingTaskNode reportingTask) {
+        reportingTask.start();
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java
deleted file mode 100644
index 25cea8802b..0000000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.groups;
-
-import org.apache.nifi.controller.exception.ProcessorInstantiationException;
-import org.apache.nifi.flow.VersionedExternalFlow;
-import org.apache.nifi.flow.VersionedProcessGroup;
-
-public interface ProcessGroupSynchronizer {
-
-    /**
-     * Synchronize the given Process Group to match the proposed snaphsot
-     * @param group the Process Group to update
-     * @param proposedFlow the proposed/desired state for the process group
-     * @param synchronizationOptions options for how to synchronize the group
-     */
-    void synchronize(ProcessGroup group, VersionedExternalFlow proposedFlow, GroupSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException;
-
-    void verifyCanSynchronize(ProcessGroup group, VersionedProcessGroup proposed, boolean verifyConnectionRemoval);
-
-}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
index b6ed741889..bafb091d47 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.groups;
 
 import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.Triggerable;
 import org.apache.nifi.controller.service.ControllerServiceNode;
@@ -100,6 +101,11 @@ public class RetainExistingStateComponentScheduler implements ComponentScheduler
         delegate.disableControllerServicesAsync(controllerServices);
     }
 
+    @Override
+    public void startReportingTask(final ReportingTaskNode reportingTask) {
+        delegate.startReportingTask(reportingTask);
+    }
+
     @Override
     public void pause() {
         delegate.pause();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 1924f9bcfb..8960cefad4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -66,6 +66,8 @@ import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.encrypt.PropertyEncryptor;
 import org.apache.nifi.flow.VersionedExternalFlow;
 import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer;
+import org.apache.nifi.flow.synchronization.VersionedFlowSynchronizationContext;
 import org.apache.nifi.logging.LogRepository;
 import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.nar.ExtensionManager;
@@ -3787,7 +3789,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         final ComponentScheduler defaultComponentScheduler = new DefaultComponentScheduler(controllerServiceProvider, stateLookup);
         final ComponentScheduler retainExistingStateScheduler = new RetainExistingStateComponentScheduler(this, defaultComponentScheduler);
 
-        final GroupSynchronizationOptions synchronizationOptions = new GroupSynchronizationOptions.Builder()
+        final FlowSynchronizationOptions synchronizationOptions = new FlowSynchronizationOptions.Builder()
             .componentIdGenerator(idGenerator)
             .componentScheduler(retainExistingStateScheduler)
             .ignoreLocalModifications(!verifyNotDirty)
@@ -3818,14 +3820,14 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     @Override
-    public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
+    public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final FlowSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
         writeLock.lock();
         try {
             verifyCanUpdate(proposedSnapshot, true, !synchronizationOptions.isIgnoreLocalModifications());
 
-            final ProcessGroupSynchronizationContext groupSynchronizationContext = createGroupSynchronizationContext(
+            final VersionedFlowSynchronizationContext groupSynchronizationContext = createGroupSynchronizationContext(
                 synchronizationOptions.getComponentIdGenerator(), synchronizationOptions.getComponentScheduler(), flowMappingOptions);
-            final StandardProcessGroupSynchronizer synchronizer = new StandardProcessGroupSynchronizer(groupSynchronizationContext);
+            final StandardVersionedComponentSynchronizer synchronizer = new StandardVersionedComponentSynchronizer(groupSynchronizationContext);
 
             final StandardVersionControlInformation originalVci = this.versionControlInfo.get();
             try {
@@ -3963,9 +3965,9 @@ public final class StandardProcessGroup implements ProcessGroup {
             }
 
             final ComponentIdGenerator componentIdGenerator = (proposedId, instanceId, destinationGroupId) -> proposedId;
-            final ProcessGroupSynchronizationContext groupSynchronizationContext = createGroupSynchronizationContext(
+            final VersionedFlowSynchronizationContext groupSynchronizationContext = createGroupSynchronizationContext(
                 componentIdGenerator, ComponentScheduler.NOP_SCHEDULER, FlowMappingOptions.DEFAULT_OPTIONS);
-            final StandardProcessGroupSynchronizer synchronizer = new StandardProcessGroupSynchronizer(groupSynchronizationContext);
+            final StandardVersionedComponentSynchronizer synchronizer = new StandardVersionedComponentSynchronizer(groupSynchronizationContext);
 
             synchronizer.verifyCanSynchronize(this, updatedFlow.getFlowContents(), verifyConnectionRemoval);
         } finally {
@@ -3973,9 +3975,9 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
     }
 
-    private ProcessGroupSynchronizationContext createGroupSynchronizationContext(final ComponentIdGenerator componentIdGenerator, final ComponentScheduler componentScheduler,
-                                                                                 final FlowMappingOptions flowMappingOptions) {
-        return new ProcessGroupSynchronizationContext.Builder()
+    private VersionedFlowSynchronizationContext createGroupSynchronizationContext(final ComponentIdGenerator componentIdGenerator, final ComponentScheduler componentScheduler,
+                                                                                  final FlowMappingOptions flowMappingOptions) {
+        return new VersionedFlowSynchronizationContext.Builder()
             .componentIdGenerator(componentIdGenerator)
             .flowManager(flowManager)
             .flowRegistryClient(flowRegistryClient)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 3c0954d2e2..1eb226701b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -497,7 +497,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
                     if (sendPort == null) {
                         sendPort = addInputPort(descriptor);
                         logger.info("Added Input Port {} with Name {} and Target Identifier {} to {}", sendPort.getIdentifier(), sendPort.getName(), sendPort.getTargetIdentifier(), this);
-                    } else {
+                    } else if (descriptor.getTargetId() != null) {
                         final String previousTargetId = sendPort.getTargetIdentifier();
                         sendPort.setTargetIdentifier(descriptor.getTargetId());
                         logger.info("Updated Target identifier for Input Port with Name {} from {} to {} for {}", descriptor.getName(), previousTargetId, descriptor.getTargetId(), this);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java
new file mode 100644
index 0000000000..235cb4df82
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java
@@ -0,0 +1,1025 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.flow.synchronization;
+
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReloadComponent;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceReference;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.ConnectableComponentType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.groups.ComponentIdGenerator;
+import org.apache.nifi.groups.ComponentScheduler;
+import org.apache.nifi.groups.FlowSynchronizationOptions;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.parameter.Parameter;
+import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.parameter.ParameterDescriptor;
+import org.apache.nifi.parameter.ParameterReferenceManager;
+import org.apache.nifi.parameter.StandardParameterContext;
+import org.apache.nifi.parameter.StandardParameterContextManager;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
+import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class StandardVersionedComponentSynchronizerTest {
+
+    private ProcessorNode processorA;
+    private ProcessorNode processorB;
+    private Connection connectionAB;
+    private Port inputPort;
+    private Port outputPort;
+    private StandardVersionedComponentSynchronizer synchronizer;
+    private FlowSynchronizationOptions synchronizationOptions;
+    private ProcessGroup group;
+    private ComponentScheduler componentScheduler;
+    private ComponentIdGenerator componentIdGenerator;
+    private ControllerServiceProvider controllerServiceProvider;
+    private ParameterContextManager parameterContextManager;
+    private ParameterReferenceManager parameterReferenceManager;
+
+    private final Set<String> queuesWithData = Collections.synchronizedSet(new HashSet<>());
+    private final Bundle bundle = new Bundle("group", "artifact", "version 1.0");
+
+    @Before
+    public void setup() {
+        final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
+        final FlowManager flowManager = Mockito.mock(FlowManager.class);
+        controllerServiceProvider = Mockito.mock(ControllerServiceProvider.class);
+        final Function<ProcessorNode, ProcessContext> processContextFactory = proc -> Mockito.mock(ProcessContext.class);
+        final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
+        final FlowRegistryClient flowRegistryClient = Mockito.mock(FlowRegistryClient.class);
+        componentIdGenerator = (proposed, instance, group) -> proposed == null ? instance : proposed;
+        componentScheduler = Mockito.mock(ComponentScheduler.class);
+        parameterContextManager = new StandardParameterContextManager();
+        parameterReferenceManager = Mockito.mock(ParameterReferenceManager.class);
+
+        when(flowManager.createControllerService(anyString(), anyString(), any(BundleCoordinate.class), anySet(), anyBoolean(), anyBoolean(), nullable(String.class)))
+            .thenReturn(Mockito.mock(ControllerServiceNode.class));
+        when(flowManager.getParameterContextManager()).thenReturn(parameterContextManager);
+        doAnswer(invocation -> {
+            invocation.getArgument(0, Runnable.class).run();
+            return null;
+        }).when(flowManager).withParameterContextResolution(any(Runnable.class));
+        doAnswer(invocation -> {
+            final String id = invocation.getArgument(0, String.class);
+            final String name = invocation.getArgument(1, String.class);
+            final ParameterContext parameterContext = new StandardParameterContext(id, name, parameterReferenceManager, null);
+
+            final Map<String, Parameter> parameterMap = invocation.getArgument(2, Map.class);
+            parameterContext.setParameters(parameterMap);
+
+            final List<String> inheritedContextIds = invocation.getArgument(3, List.class);
+            final List<ParameterContext> inheritedContexts = inheritedContextIds.stream()
+                .map(parameterContextManager::getParameterContext)
+                .collect(Collectors.toList());
+            parameterContext.setInheritedParameterContexts(inheritedContexts);
+
+            parameterContextManager.addParameterContext(parameterContext);
+
+            return parameterContext;
+        }).when(flowManager).createParameterContext(anyString(), anyString(), anyMap(), anyList());
+
+        final VersionedFlowSynchronizationContext context = new VersionedFlowSynchronizationContext.Builder()
+            .componentIdGenerator(componentIdGenerator)
+            .componentScheduler(componentScheduler)
+            .extensionManager(extensionManager)
+            .flowManager(flowManager)
+            .controllerServiceProvider(controllerServiceProvider)
+            .flowMappingOptions(FlowMappingOptions.DEFAULT_OPTIONS)
+            .processContextFactory(processContextFactory)
+            .reloadComponent(reloadComponent)
+            .flowRegistryClient(flowRegistryClient)
+            .build();
+
+        group = Mockito.mock(ProcessGroup.class);
+
+        processorA = createMockProcessor();
+        processorB = createMockProcessor();
+        inputPort = createMockPort(ConnectableType.INPUT_PORT);
+        outputPort = createMockPort(ConnectableType.OUTPUT_PORT);
+        connectionAB = createMockConnection(processorA, processorB, group);
+
+        when(group.getProcessors()).thenReturn(Arrays.asList(processorA, processorB));
+        when(group.getInputPorts()).thenReturn(Collections.singleton(inputPort));
+        when(group.getOutputPorts()).thenReturn(Collections.singleton(outputPort));
+
+        synchronizationOptions = new FlowSynchronizationOptions.Builder()
+            .componentIdGenerator(componentIdGenerator)
+            .componentScheduler(componentScheduler)
+            .build();
+
+        synchronizer = new StandardVersionedComponentSynchronizer(context);
+
+        queuesWithData.clear();
+    }
+
+    private FlowSynchronizationOptions createQuickFailSynchronizationOptions(final FlowSynchronizationOptions.ComponentStopTimeoutAction timeoutAction) {
+        return new FlowSynchronizationOptions.Builder()
+            .componentIdGenerator(componentIdGenerator)
+            .componentScheduler(componentScheduler)
+            .componentStopTimeout(Duration.ofMillis(10))
+            .componentStopTimeoutAction(timeoutAction)
+            .build();
+    }
+
+    private ProcessorNode createMockProcessor() {
+        final String uuid = UUID.randomUUID().toString();
+
+        final ProcessorNode processor = Mockito.mock(ProcessorNode.class);
+        instrumentComponentNodeMethods(uuid, processor);
+        when(processor.isRunning()).thenReturn(false);
+        when(processor.getProcessGroup()).thenReturn(group);
+        when(processor.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+        when(processor.getScheduledState()).thenReturn(org.apache.nifi.controller.ScheduledState.STOPPED);
+
+        return processor;
+    }
+
+    private ControllerServiceNode createMockControllerService() {
+        final String uuid = UUID.randomUUID().toString();
+
+        final ControllerServiceNode service = Mockito.mock(ControllerServiceNode.class);
+        instrumentComponentNodeMethods(uuid, service);
+
+        when(service.isActive()).thenReturn(false);
+        when(service.getProcessGroup()).thenReturn(group);
+        when(service.getState()).thenReturn(ControllerServiceState.DISABLED);
+
+        return service;
+    }
+
+    private void instrumentComponentNodeMethods(final String uuid, final ComponentNode component) {
+        when(component.getIdentifier()).thenReturn(uuid);
+        when(component.getProperties()).thenReturn(Collections.emptyMap());
+        when(component.getPropertyDescriptor(anyString())).thenAnswer(invocation -> {
+            return new PropertyDescriptor.Builder()
+                .name(invocation.getArgument(0, String.class))
+                .build();
+        });
+        when(component.getBundleCoordinate()).thenReturn(new BundleCoordinate("group", "artifact", "version 1.0"));
+    }
+
+    private Port createMockPort(final ConnectableType connectableType) {
+        final String uuid = UUID.randomUUID().toString();
+
+        final Port port = Mockito.mock(Port.class);
+        when(port.getIdentifier()).thenReturn(uuid);
+        when(port.isRunning()).thenReturn(false);
+        when(port.getProcessGroup()).thenReturn(group);
+        when(port.getConnectableType()).thenReturn(connectableType);
+
+        return port;
+    }
+
+
+    private Connection createMockConnection(final Connectable source, final Connectable destination, final ProcessGroup group) {
+        final String uuid = UUID.randomUUID().toString();
+
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        when(flowFileQueue.getIdentifier()).thenReturn(uuid);
+        when(flowFileQueue.isEmpty()).thenAnswer(invocation -> !queuesWithData.contains(uuid));
+
+        final Connection connection = Mockito.mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn(uuid);
+        when(connection.getSource()).thenReturn(source);
+        when(connection.getDestination()).thenReturn(destination);
+        when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
+        when(connection.getProcessGroup()).thenReturn(group);
+
+        // Update the source's connections
+        final Set<Connection> outgoing = source.getConnections();
+        final Set<Connection> updatedOutgoing = outgoing == null ? new HashSet<>() : new HashSet<>(outgoing);
+        updatedOutgoing.add(connection);
+        when(source.getConnections()).thenReturn(updatedOutgoing);
+
+        // Update the destination's incoming connections
+        final List<Connection> incoming = destination.getIncomingConnections();
+        final List<Connection> updatedIncoming = incoming == null ? new ArrayList<>() : new ArrayList<>(incoming);
+        updatedIncoming.add(connection);
+        when(destination.getIncomingConnections()).thenReturn(updatedIncoming);
+
+        // Update group to return the connection
+        final Set<Connection> currentConnections = group.getConnections();
+        final Set<Connection> updatedConnections = currentConnections == null ? new HashSet<>() : new HashSet<>(currentConnections);
+        updatedConnections.add(connection);
+        when(group.getConnections()).thenReturn(updatedConnections);
+
+        return connection;
+    }
+
+    @Test
+    public void testSynchronizeStoppedProcessor() throws FlowSynchronizationException, TimeoutException, InterruptedException {
+        final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
+        synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions);
+
+        // Ensure that the processor was updated as expected.
+        verify(processorA).setProperties(versionedProcessor.getProperties(), true);
+        verify(processorA).setName(versionedProcessor.getName());
+        verify(componentScheduler, times(0)).startComponent(any(Connectable.class));
+    }
+
+    @Test
+    public void testSynchronizationStartsProcessor() throws FlowSynchronizationException, TimeoutException, InterruptedException {
+        final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
+        versionedProcessor.setScheduledState(ScheduledState.RUNNING);
+
+        synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions);
+        verify(componentScheduler, times(1)).transitionComponentState(any(Connectable.class), eq(ScheduledState.RUNNING));
+    }
+
+    @Test
+    public void testRunningProcessorRestarted() throws FlowSynchronizationException, TimeoutException, InterruptedException {
+        final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
+        versionedProcessor.setScheduledState(ScheduledState.RUNNING);
+
+        when(processorA.isRunning()).thenReturn(true);
+        when(group.stopProcessor(processorA)).thenReturn(CompletableFuture.completedFuture(null));
+
+        synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions);
+
+        verify(group, times(1)).stopProcessor(processorA);
+        verify(processorA).setProperties(versionedProcessor.getProperties(), true);
+        verify(componentScheduler, atLeast(1)).startComponent(any(Connectable.class));
+    }
+
+    @Test
+    public void testTimeoutWhenProcessorDoesNotStop() {
+        final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
+        versionedProcessor.setScheduledState(ScheduledState.RUNNING);
+        startProcessor(processorA, false);
+
+        synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
+
+        assertThrows(TimeoutException.class, () -> {
+            synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions);
+        });
+
+        verifyStopped(processorA);
+        verifyNotRestarted(processorA);
+        verify(processorA, times(0)).terminate();
+        verify(processorA, times(0)).setProperties(versionedProcessor.getProperties());
+        verify(processorA, times(0)).setName(versionedProcessor.getName());
+    }
+
+    @Test
+    public void testTerminateWhenProcessorDoesNotStop() throws FlowSynchronizationException, TimeoutException, InterruptedException {
+        final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
+        versionedProcessor.setScheduledState(ScheduledState.RUNNING);
+        startProcessor(processorA, false);
+
+        synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.TERMINATE);
+        synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions);
+
+        verifyStopped(processorA);
+        verifyRestarted(processorA);
+        verify(processorA, times(1)).terminate();
+        verify(processorA, times(1)).setProperties(versionedProcessor.getProperties(), true);
+        verify(processorA, times(1)).setName(versionedProcessor.getName());
+    }
+
+    @Test
+    public void testUpdateConnectionWithSourceDestStopped() throws FlowSynchronizationException, TimeoutException {
+        final VersionedConnection versionedConnection = createMinimalVersionedConnection(processorA, processorB);
+        versionedConnection.setName("Hello");
+
+        synchronizer.synchronize(connectionAB, versionedConnection, group, synchronizationOptions);
+
+        verify(connectionAB, times(1)).setName("Hello");
+        verify(connectionAB, times(1)).setRelationships(Collections.singleton(new Relationship.Builder().name("success").build()));
+    }
+
+    @Test
+    public void testUpdateConnectionStopsSource() throws FlowSynchronizationException, TimeoutException {
+        startProcessor(processorA);
+
+        final VersionedConnection versionedConnection = createMinimalVersionedConnection(processorA, processorB);
+        versionedConnection.setName("Hello");
+
+        synchronizer.synchronize(connectionAB, versionedConnection, group, synchronizationOptions);
+
+        // Ensure that the update occurred
+        verify(connectionAB, times(1)).setName("Hello");
+
+        // Ensure that the source was stopped and restarted
+        verifyStopped(processorA);
+        verifyRestarted(processorA);
+    }
+
+    @Test
+    public void testSourceTerminatedIfNotStopped() throws FlowSynchronizationException, TimeoutException {
+        startProcessor(processorA, false);
+
+        final VersionedConnection versionedConnection = createMinimalVersionedConnection(processorA, processorB);
+        versionedConnection.setName("Hello");
+
+        synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.TERMINATE);
+        synchronizer.synchronize(connectionAB, versionedConnection, group, synchronizationOptions);
+
+        // Ensure that we terminate the source
+        verify(processorA, times(1)).terminate();
+
+        // Ensure that the update occurred
+        verify(connectionAB, times(1)).setName("Hello");
+
+        // Ensure that the source was stopped and restarted
+        verifyStopped(processorA);
+        verifyRestarted(processorA);
+    }
+
+    @Test
+    public void testTimeoutStoppingSource() {
+        startProcessor(processorA, false);
+
+        final VersionedConnection versionedConnection = createMinimalVersionedConnection(processorA, processorB);
+        versionedConnection.setName("Hello");
+
+        synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
+
+        assertThrows(TimeoutException.class, () -> {
+            synchronizer.synchronize(connectionAB, versionedConnection, group, synchronizationOptions);
+        });
+
+        // Ensure that we terminate the source
+        verify(processorA, times(0)).terminate();
+
+        // Ensure that the update occurred
+        verify(connectionAB, times(0)).setName("Hello");
+
+        // Ensure that the source was stopped and restarted
+        verifyStopped(processorA);
+        verifyNotRestarted(processorA);
+    }
+
+    @Test
+    public void testConnectionRemoval() throws FlowSynchronizationException, TimeoutException {
+        startProcessor(processorA);
+
+        synchronizer.synchronize(connectionAB, null, group, synchronizationOptions);
+
+        // Ensure that the source was stopped and restarted
+        verifyStopped(processorA);
+        verifyNotRestarted(processorA);
+        verify(group).removeConnection(connectionAB);
+    }
+
+    @Test
+    public void testFailsIfDestinationStoppedQueueNotEmpty() {
+        startProcessor(processorA);
+        queuesWithData.add(connectionAB.getIdentifier());
+
+        synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
+
+        assertThrows(FlowSynchronizationException.class, () -> {
+            synchronizer.synchronize(connectionAB, null, group, synchronizationOptions);
+        });
+
+        // Ensure that the update occurred
+        verify(connectionAB, times(0)).setName("Hello");
+
+        // Ensure that the source was stopped but not restarted. We don't restart in this situation because the intent is to drop
+        // the connection so we will leave the source stopped so that the data can eventually drain from the queue and the connection
+        // can be removed.
+        verifyStopped(processorA);
+        verifyNotRestarted(processorA);
+    }
+
+    @Test
+    public void testWaitForQueueToEmpty() throws InterruptedException {
+        startProcessor(processorA);
+        startProcessor(processorB);
+        queuesWithData.add(connectionAB.getIdentifier());
+
+        // Use a background thread to synchronize the connection.
+        final CountDownLatch completionLatch = new CountDownLatch(1);
+        final Thread syncThread = new Thread(() -> {
+            try {
+                synchronizer.synchronize(connectionAB, null, group, synchronizationOptions);
+                completionLatch.countDown();
+            } catch (final Exception e) {
+                Assert.fail(e.toString());
+            }
+        });
+        syncThread.start();
+
+        // Wait up to 1/2 second to ensure that the task does not complete.
+        final boolean completed = completionLatch.await(500, TimeUnit.MILLISECONDS);
+        assertFalse(completed);
+
+        // Clear the queue's data.
+        queuesWithData.clear();
+
+        // The task should now complete quickly. Give up to 5 seconds in case this is run in a slow environment.
+        assertTrue(completionLatch.await(5, TimeUnit.SECONDS));
+
+        // Ensure that the update occurred
+        verify(connectionAB, times(0)).setName("Hello");
+
+        // Ensure that the source was stopped, destination was stopped, and the connection was removed.
+        verifyStopped(processorA);
+        verifyNotRestarted(processorA);
+        verifyStopped(processorB);
+        verifyNotRestarted(processorB);
+        verify(group, times(1)).removeConnection(connectionAB);
+    }
+
+    @Test
+    public void testPortUpdatedWhenStopped() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        final VersionedPort versionedInputPort = createMinimalVersionedPort(ComponentType.INPUT_PORT);
+        synchronizer.synchronize(inputPort, versionedInputPort, group, synchronizationOptions);
+
+        verifyNotRestarted(inputPort);
+        verify(inputPort).setName("Input");
+
+        final VersionedPort versionedOutputPort = createMinimalVersionedPort(ComponentType.OUTPUT_PORT);
+        synchronizer.synchronize(outputPort, versionedOutputPort, group, synchronizationOptions);
+
+        verifyNotRestarted(outputPort);
+        verify(outputPort).setName("Output");
+    }
+
+    @Test
+    public void testPortStarted() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        final VersionedPort versionedInputPort = createMinimalVersionedPort(ComponentType.INPUT_PORT);
+        versionedInputPort.setScheduledState(ScheduledState.RUNNING);
+        synchronizer.synchronize(inputPort, versionedInputPort, group, synchronizationOptions);
+
+        verify(componentScheduler, atLeast(1)).transitionComponentState(inputPort, ScheduledState.RUNNING);
+        verify(inputPort).setName("Input");
+    }
+
+    @Test
+    public void testPortRestarted() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        final VersionedPort versionedInputPort = createMinimalVersionedPort(ComponentType.INPUT_PORT);
+        versionedInputPort.setScheduledState(ScheduledState.RUNNING);
+        synchronizer.synchronize(inputPort, versionedInputPort, group, synchronizationOptions);
+
+        verify(componentScheduler, atLeast(1)).transitionComponentState(inputPort, ScheduledState.RUNNING);
+        verify(inputPort).setName("Input");
+    }
+
+    @Test
+    public void testRemoveOutputPortFailsIfIncomingConnection() {
+        createMockConnection(processorA, outputPort, group);
+
+        assertThrows(FlowSynchronizationException.class, () -> {
+            synchronizer.synchronize(outputPort, null, group, synchronizationOptions);
+        });
+    }
+
+    @Test
+    public void testRemoveInputPortFailsIfOutgoingConnectionNotEmpty() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        final Connection connection = createMockConnection(inputPort, processorA, group);
+        synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
+
+        // Synchronize should succeed because connection doesn't have data.
+        synchronizer.synchronize(inputPort, null, group, synchronizationOptions);
+
+        // Now give it data
+        queuesWithData.add(connection.getIdentifier());
+
+        // Ensure that we fail to remove it due to FlowSynchronizationException because destination of connection is not running
+        assertThrows(FlowSynchronizationException.class, () -> {
+            synchronizer.synchronize(inputPort, null, group, synchronizationOptions);
+        });
+
+        // Start processor and ensure that we fail to remove it due to TimeoutException because destination of connection is now running
+        startProcessor(processorA);
+        assertThrows(TimeoutException.class, () -> {
+            synchronizer.synchronize(inputPort, null, group, synchronizationOptions);
+        });
+    }
+
+
+    @Test
+    public void testAddsControllerService() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        final VersionedControllerService versionedService = createMinimalVersionedControllerService();
+        synchronizer.synchronize(null, versionedService, group, synchronizationOptions);
+
+        verify(group).addControllerService(any(ControllerServiceNode.class));
+    }
+
+    @Test
+    public void testControllerServiceRemoved() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        final ControllerServiceNode service = createMockControllerService();
+        when(service.isActive()).thenReturn(true);
+        when(service.getState()).thenReturn(ControllerServiceState.ENABLED);
+        when(service.getReferences()).thenReturn(Mockito.mock(ControllerServiceReference.class));
+
+        when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(Collections.emptyMap());
+        when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenReturn(CompletableFuture.completedFuture(null));
+
+        synchronizer.synchronize(service, null, group, synchronizationOptions);
+
+        verify(controllerServiceProvider).unscheduleReferencingComponents(service);
+        verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service));
+        verify(controllerServiceProvider).removeControllerService(service);
+    }
+
+    @Test
+    public void testReferencesStoppedAndRestarted() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        final ControllerServiceNode service = createMockControllerService();
+        when(service.isActive()).thenReturn(true);
+        when(service.getState()).thenReturn(ControllerServiceState.ENABLED);
+
+        // Make Processors A and B reference the controller service and start them
+        setReferences(service, processorA, processorB);
+        startProcessor(processorB);
+
+        when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(Collections.singletonMap(processorB, CompletableFuture.completedFuture(null)));
+        when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenReturn(CompletableFuture.completedFuture(null));
+
+        final VersionedControllerService versionedControllerService = createMinimalVersionedControllerService();
+        versionedControllerService.setName("Hello");
+        versionedControllerService.setScheduledState(ScheduledState.RUNNING);
+
+        synchronizer.synchronize(service, versionedControllerService, group, synchronizationOptions);
+
+        verify(controllerServiceProvider).unscheduleReferencingComponents(service);
+        verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service));
+        verify(controllerServiceProvider).enableControllerServicesAsync(Collections.singleton(service));
+        verify(controllerServiceProvider).scheduleReferencingComponents(service, Collections.singleton(processorB), componentScheduler);
+        verify(service).setName("Hello");
+    }
+
+    @Test
+    public void testTerminateReferenceOnTimeout() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        final ControllerServiceNode service = createMockControllerService();
+        when(service.isActive()).thenReturn(true);
+        when(service.getState()).thenReturn(ControllerServiceState.ENABLED);
+
+        // Make Processors A and B reference the controller service and start them
+        setReferences(service, processorA, processorB);
+        startProcessor(processorB, false);
+
+        synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.TERMINATE);
+
+        // When unscheduleReferencingComponents is called, return a Future that will never complete.
+        when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(Collections.singletonMap(processorB, new CompletableFuture<>()));
+        when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenReturn(CompletableFuture.completedFuture(null));
+
+        final VersionedControllerService versionedControllerService = createMinimalVersionedControllerService();
+        versionedControllerService.setName("Hello");
+        versionedControllerService.setScheduledState(ScheduledState.RUNNING);
+
+        synchronizer.synchronize(service, versionedControllerService, group, synchronizationOptions);
+
+        verify(controllerServiceProvider).unscheduleReferencingComponents(service);
+        verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service));
+        verify(processorB).terminate();
+        verify(processorA, times(0)).terminate();
+        verify(controllerServiceProvider).enableControllerServicesAsync(Collections.singleton(service));
+        verify(controllerServiceProvider).scheduleReferencingComponents(service, Collections.singleton(processorB), componentScheduler);
+        verify(service).setName("Hello");
+    }
+
+
+    @Test
+    public void testCreatingParameterContext() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        final Map<String, String> parameterMap = new HashMap<>();
+        parameterMap.put("abc", "xyz");
+        parameterMap.put("secret", "yes");
+        final VersionedParameterContext proposed = createVersionedParameterContext("Context 1", parameterMap, Collections.singleton("secret"));
+
+        synchronizer.synchronize(null, proposed, synchronizationOptions);
+
+        final Set<ParameterContext> contexts = parameterContextManager.getParameterContexts();
+        assertEquals(1, contexts.size());
+
+        final ParameterContext created = contexts.iterator().next();
+        assertEquals(created.getName(), proposed.getName());
+
+        final Map<ParameterDescriptor, Parameter> createdParameters = created.getParameters();
+        assertEquals(2, createdParameters.size());
+
+        final Parameter abc = created.getParameter("abc").get();
+        assertEquals("abc", abc.getDescriptor().getName());
+        assertFalse(abc.getDescriptor().isSensitive());
+        assertEquals("xyz", abc.getValue());
+
+        final Parameter secret = created.getParameter("secret").get();
+        assertEquals("secret", secret.getDescriptor().getName());
+        assertTrue(secret.getDescriptor().isSensitive());
+        assertEquals("yes", secret.getValue());
+    }
+
+    @Test
+    public void testUpdateParametersNoReferences() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        // Create the initial context
+        testCreatingParameterContext();
+
+        final ParameterContext existing = parameterContextManager.getParameterContexts().iterator().next();
+
+        final Map<String, String> parameterMap = new HashMap<>();
+        parameterMap.put("abc", "123");
+        parameterMap.put("secret", "maybe");
+
+        final VersionedParameterContext proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+
+        synchronizer.synchronize(existing, proposed, synchronizationOptions);
+
+        assertEquals("123", existing.getParameter("abc").get().getValue());
+        assertEquals("maybe", existing.getParameter("secret").get().getValue());
+        assertEquals("Context 2", existing.getName());
+    }
+
+    @Test
+    public void testUpdateParametersReferenceProcessorNotStopping() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        // Create the initial context
+        testCreatingParameterContext();
+
+        final ParameterContext existing = parameterContextManager.getParameterContexts().iterator().next();
+
+        final Map<String, String> parameterMap = new HashMap<>();
+        parameterMap.put("abc", "123");
+        parameterMap.put("secret", "maybe");
+
+        final VersionedParameterContext proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+
+        final ProcessorNode processorA = createMockProcessor();
+        startProcessor(processorA, false);
+        synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
+        when(parameterReferenceManager.getProcessorsReferencing(existing, "abc")).thenReturn(Collections.singleton(processorA));
+
+        assertThrows(TimeoutException.class, () -> {
+            synchronizer.synchronize(existing, proposed, synchronizationOptions);
+        });
+
+        // Updates should not occur.
+        assertEquals("xyz", existing.getParameter("abc").get().getValue());
+        assertEquals("yes", existing.getParameter("secret").get().getValue());
+        assertEquals("Context 1", existing.getName());
+    }
+
+    @Test
+    public void testUpdateParametersReferenceStopping() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        // Create the initial context
+        testCreatingParameterContext();
+
+        final ParameterContext existing = parameterContextManager.getParameterContexts().iterator().next();
+
+        final Map<String, String> parameterMap = new HashMap<>();
+        parameterMap.put("abc", "123");
+        parameterMap.put("secret", "maybe");
+
+        final VersionedParameterContext proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+
+        final ProcessorNode processorA = createMockProcessor();
+        startProcessor(processorA, true);
+
+        final ProcessorNode processorB = createMockProcessor();
+
+        final AtomicBoolean serviceActive = new AtomicBoolean(true);
+
+        final ControllerServiceNode service = createMockControllerService();
+        when(service.isActive()).thenAnswer(invocation -> serviceActive.get());
+        when(service.getState()).thenAnswer(invocation -> serviceActive.get() ? ControllerServiceState.ENABLED : ControllerServiceState.DISABLED);
+
+        // Make Processors A and B reference the controller service and start them
+        setReferences(service, processorA, processorB);
+        startProcessor(processorB);
+
+        when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(Collections.singletonMap(processorB, CompletableFuture.completedFuture(null)));
+        when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenAnswer(invocation -> {
+            serviceActive.set(false);
+            return CompletableFuture.completedFuture(null);
+        });
+
+        when(parameterReferenceManager.getProcessorsReferencing(existing, "abc")).thenReturn(Collections.emptySet());
+        when(parameterReferenceManager.getControllerServicesReferencing(existing, "abc")).thenReturn(Collections.singleton(service));
+
+        synchronizer.synchronize(existing, proposed, synchronizationOptions);
+
+        // Updates should occur.
+        assertEquals("123", existing.getParameter("abc").get().getValue());
+        assertEquals("maybe", existing.getParameter("secret").get().getValue());
+        assertEquals("Context 2", existing.getName());
+
+        // Verify controller service/reference lifecycles
+        verify(controllerServiceProvider).unscheduleReferencingComponents(service);
+        verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service));
+        verify(controllerServiceProvider).enableControllerServicesAsync(Collections.singleton(service));
+        verify(componentScheduler).startComponent(processorB);
+    }
+
+    @Test
+    public void testUpdateParametersControllerServiceNotDisabling() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        // Create the initial context
+        testCreatingParameterContext();
+
+        final ParameterContext existing = parameterContextManager.getParameterContexts().iterator().next();
+
+        final Map<String, String> parameterMap = new HashMap<>();
+        parameterMap.put("abc", "123");
+        parameterMap.put("secret", "maybe");
+
+        final VersionedParameterContext proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+
+        final ProcessorNode processorA = createMockProcessor();
+        final ProcessorNode processorB = createMockProcessor();
+
+        final ControllerServiceNode service = createMockControllerService();
+        when(service.isActive()).thenReturn(true);
+        when(service.getState()).thenReturn(ControllerServiceState.ENABLED);
+
+        // Make Processors A and B reference the controller service and start them
+        setReferences(service, processorA, processorB);
+        startProcessor(processorA, true);
+        startProcessor(processorB);
+
+        final Map<ComponentNode, Future<Void>> completedFutureMap = new HashMap<>();
+        completedFutureMap.put(processorA, CompletableFuture.completedFuture(null));
+        completedFutureMap.put(processorB, CompletableFuture.completedFuture(null));
+
+        when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(completedFutureMap);
+        when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenReturn(new CompletableFuture<>()); // Never complete future = never disable service
+
+        synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.TERMINATE);
+
+        when(parameterReferenceManager.getProcessorsReferencing(existing, "abc")).thenReturn(Collections.emptySet());
+        when(parameterReferenceManager.getControllerServicesReferencing(existing, "abc")).thenReturn(Collections.singleton(service));
+
+        assertThrows(TimeoutException.class, () -> {
+            synchronizer.synchronize(existing, proposed, synchronizationOptions);
+        });
+
+        // Updates should not occur.
+        assertEquals("xyz", existing.getParameter("abc").get().getValue());
+        assertEquals("yes", existing.getParameter("secret").get().getValue());
+        assertEquals("Context 1", existing.getName());
+
+        // Verify controller service/reference lifecycles
+        verify(controllerServiceProvider).unscheduleReferencingComponents(service);
+        verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service));
+        verify(controllerServiceProvider, times(0)).enableControllerServicesAsync(Collections.singleton(service));
+        verify(componentScheduler).startComponent(processorA);
+        verify(componentScheduler).startComponent(processorB);
+    }
+
+    @Test
+    public void testGetUpdatedParameterNames() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+        testCreatingParameterContext();
+        final ParameterContext existing = parameterContextManager.getParameterContexts().iterator().next();
+
+        final Map<String, String> originalParams = new HashMap<>();
+        originalParams.put("abc", "xyz");
+        originalParams.put("secret", "yes");
+
+        // Test no changes
+        Map<String, String> parameterMap = new HashMap<>(originalParams);
+        VersionedParameterContext proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+        assertEquals(Collections.emptySet(), synchronizer.getUpdatedParameterNames(existing, proposed));
+
+        // Test non-sensitive param change
+        parameterMap = new HashMap<>(originalParams);
+        parameterMap.put("abc", "hello");
+        proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+        assertEquals(Collections.singleton("abc"), synchronizer.getUpdatedParameterNames(existing, proposed));
+
+        // Test sensitive param change
+        parameterMap = new HashMap<>(originalParams);
+        parameterMap.put("secret", "secret");
+        proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+        assertEquals(Collections.singleton("secret"), synchronizer.getUpdatedParameterNames(existing, proposed));
+
+        // Test removed parameters
+        parameterMap.clear();
+        proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+        assertEquals(new HashSet<>(Arrays.asList("abc", "secret")), synchronizer.getUpdatedParameterNames(existing, proposed));
+
+        // Test added parameter
+        parameterMap = new HashMap<>(originalParams);
+        parameterMap.put("Added", "Added");
+        proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+        assertEquals(Collections.singleton("Added"), synchronizer.getUpdatedParameterNames(existing, proposed));
+
+        // Test added, removed, and updated parameters
+        parameterMap = new HashMap<>(originalParams);
+        parameterMap.put("Added", "Added");
+        parameterMap.put("Added 2", "Added");
+        parameterMap.remove("secret");
+        parameterMap.put("abc", "hello");
+        proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+        assertEquals(new HashSet<>(Arrays.asList("abc", "secret", "Added", "Added 2")), synchronizer.getUpdatedParameterNames(existing, proposed));
+    }
+
+
+    private VersionedParameterContext createVersionedParameterContext(final String name, final Map<String, String> parameters, final Set<String> sensitiveParamNames) {
+        final Set<VersionedParameter> versionedParameters = new HashSet<>();
+        for (final Map.Entry<String, String> entry : parameters.entrySet()) {
+            final VersionedParameter param = new VersionedParameter();
+            param.setName(entry.getKey());
+            param.setValue(entry.getValue());
+            param.setSensitive(sensitiveParamNames.contains(entry.getKey()));
+            versionedParameters.add(param);
+        }
+
+        final VersionedParameterContext context = new VersionedParameterContext();
+        context.setName(name);
+        context.setDescription("Generated for unit test");
+        context.setParameters(versionedParameters);
+        context.setIdentifier(UUID.randomUUID().toString());
+
+        return context;
+    }
+
+    private void setReferences(final ControllerServiceNode service, final ComponentNode... reference) {
+        final ControllerServiceReference csReference = Mockito.mock(ControllerServiceReference.class);
+        when(csReference.getReferencingComponents()).thenReturn(new HashSet<>(Arrays.asList(reference)));
+        when(service.getReferences()).thenReturn(csReference);
+    }
+
+
+    //////////
+    // Convenience methods for testing
+    //////////
+
+    private void startProcessor(final ProcessorNode processor) {
+        startProcessor(processor, true);
+    }
+
+    private void startProcessor(final ProcessorNode processor, final boolean allowStopToComplete) {
+        when(processor.isRunning()).thenReturn(true);
+        when(processor.getScheduledState()).thenReturn(org.apache.nifi.controller.ScheduledState.RUNNING);
+
+        // If we want the stopping to complete, created an already-completed future. Otherwise, create a CompletableFuture that we will never complete.
+        final CompletableFuture<Void> future = allowStopToComplete ? CompletableFuture.completedFuture(null) : new CompletableFuture<>();
+        when(group.stopProcessor(processor)).thenReturn(future);
+    }
+
+    private void verifyStopped(final ProcessorNode processor) {
+        verify(group, atLeast(1)).stopProcessor(processor);
+    }
+
+    private void verifyStopped(final Port port) {
+        if (port.getConnectableType() == ConnectableType.INPUT_PORT) {
+            verify(group, atLeast(1)).stopInputPort(port);
+        } else {
+            verify(group, atLeast(1)).stopOutputPort(port);
+        }
+    }
+
+    private void verifyRestarted(final Connectable component) {
+        verify(componentScheduler, atLeast(1)).startComponent(component);
+    }
+
+    private void verifyNotRestarted(final Connectable component) {
+        verify(componentScheduler, atLeast(0)).startComponent(component);
+    }
+
+    private VersionedConnection createMinimalVersionedConnection(final ProcessorNode source, final ProcessorNode destination) {
+        final ConnectableComponent connectableComponentA = createConnectableComponent(source);
+        final ConnectableComponent connectableComponentB = createConnectableComponent(destination);
+
+        final VersionedConnection versionedConnection = new VersionedConnection();
+        versionedConnection.setBackPressureDataSizeThreshold("1 GB");
+        versionedConnection.setBackPressureObjectThreshold(10000L);
+        versionedConnection.setSource(connectableComponentA);
+        versionedConnection.setDestination(connectableComponentB);
+        versionedConnection.setLoadBalanceStrategy(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE.name());
+        versionedConnection.setLabelIndex(0);
+        versionedConnection.setSelectedRelationships(Collections.singleton("success"));
+        versionedConnection.setzIndex(0L);
+
+        return versionedConnection;
+    }
+
+    private ConnectableComponent createConnectableComponent(final ProcessorNode processor) {
+        final ConnectableComponent component = new ConnectableComponent();
+        component.setId(processor.getIdentifier());
+        component.setInstanceIdentifier(processor.getIdentifier());
+        component.setType(ConnectableComponentType.PROCESSOR);
+        return component;
+    }
+
+    private VersionedProcessor createMinimalVersionedProcessor() {
+        final VersionedProcessor versionedProcessor = new VersionedProcessor();
+        versionedProcessor.setIdentifier("12345");
+        versionedProcessor.setName("name");
+        versionedProcessor.setAutoTerminatedRelationships(Collections.emptySet());
+        versionedProcessor.setBundle(bundle);
+        versionedProcessor.setBulletinLevel(LogLevel.WARN.name());
+        versionedProcessor.setConcurrentlySchedulableTaskCount(1);
+        versionedProcessor.setPropertyDescriptors(Collections.emptyMap());
+        versionedProcessor.setScheduledState(ScheduledState.ENABLED);
+        versionedProcessor.setRunDurationMillis(0L);
+        versionedProcessor.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN.name());
+        versionedProcessor.setExecutionNode(ExecutionNode.ALL.name());
+        versionedProcessor.setProperties(Collections.singletonMap("abc", "123"));
+        versionedProcessor.setPosition(new Position(0D, 0D));
+
+        return versionedProcessor;
+    }
+
+    private VersionedControllerService createMinimalVersionedControllerService() {
+        final VersionedControllerService versionedService = new VersionedControllerService();
+        versionedService.setIdentifier("12345");
+        versionedService.setName("name");
+        versionedService.setBundle(bundle);
+        versionedService.setPropertyDescriptors(Collections.emptyMap());
+        versionedService.setScheduledState(ScheduledState.DISABLED);
+        versionedService.setProperties(Collections.singletonMap("abc", "123"));
+        versionedService.setPosition(new Position(0D, 0D));
+        versionedService.setType("ControllerServiceImpl");
+
+        return versionedService;
+    }
+
+    private VersionedPort createMinimalVersionedPort(final ComponentType componentType) {
+        final VersionedPort versionedPort = new VersionedPort();
+        versionedPort.setIdentifier("1234");
+        versionedPort.setInstanceIdentifier("1234");
+        versionedPort.setName(componentType == ComponentType.INPUT_PORT ? "Input" : "Output");
+        versionedPort.setScheduledState(ScheduledState.ENABLED);
+        versionedPort.setComponentType(ComponentType.INPUT_PORT);
+        versionedPort.setPosition(new Position(0D, 0D));
+        versionedPort.setConcurrentlySchedulableTaskCount(1);
+
+        return versionedPort;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
index c0923b5b6a..ff94934442 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -16,13 +16,6 @@
  */
 package org.apache.nifi.controller;
 
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
@@ -34,6 +27,13 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.TerminatedTaskException;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
 public interface ProcessScheduler {
 
     /**
@@ -224,7 +224,7 @@ public interface ProcessScheduler {
      *
      * @param taskNode to unschedule
      */
-    void unschedule(ReportingTaskNode taskNode);
+    Future<Void> unschedule(ReportingTaskNode taskNode);
 
     /**
      * Begins scheduling the given Reporting Task to run
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index dd9c1652e7..83b0b411a2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -125,4 +125,11 @@ public interface ReportingTaskNode extends ComponentNode {
      */
     List<ConfigVerificationResult> verifyConfiguration(ConfigurationContext context, ComponentLog logger, ExtensionManager extensionManager);
 
+    void start();
+
+    void stop();
+
+    void enable();
+
+    void disable();
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index f610349691..cee1e5a303 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -19,9 +19,11 @@ package org.apache.nifi.controller.service;
 import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.groups.ComponentScheduler;
 import org.apache.nifi.nar.ExtensionManager;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
@@ -134,7 +136,7 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
      *
      * @param serviceNode the node
      */
-    Set<ComponentNode> unscheduleReferencingComponents(ControllerServiceNode serviceNode);
+    Map<ComponentNode, Future<Void>> unscheduleReferencingComponents(ControllerServiceNode serviceNode);
 
     /**
      * Verifies that all Controller Services referencing the provided Controller
@@ -201,6 +203,12 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
      */
     Set<ComponentNode> scheduleReferencingComponents(ControllerServiceNode serviceNode);
 
+    /**
+     * Schedules any of the candidate components that are currently referencing the given Controller Service to run.
+     * @return the components that were scheduled
+     */
+    Set<ComponentNode> scheduleReferencingComponents(ControllerServiceNode serviceNode, Set<ComponentNode> candidates, ComponentScheduler componentScheduler);
+
     /**
      *
      * @param serviceType type of service to get identifiers for
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java
index d1f6cc1ac0..ac44116977 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java
@@ -25,6 +25,7 @@ import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.flow.ScheduledState;
 import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
 import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.controller.ReportingTaskNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +41,8 @@ public abstract class AbstractComponentScheduler implements ComponentScheduler {
     private final VersionedComponentStateLookup stateLookup;
 
     private final AtomicLong pauseCount = new AtomicLong(0L);
-    private final Queue<Connectable> toStart = new LinkedBlockingQueue<>();
+    private final Queue<Connectable> connectablesToStart = new LinkedBlockingQueue<>();
+    private final Queue<ReportingTaskNode> reportingTasksToStart = new LinkedBlockingQueue<>();
     private final Queue<ControllerServiceNode> toEnable = new LinkedBlockingQueue<>();
 
     public AbstractComponentScheduler(final ControllerServiceProvider controllerServiceProvider, final VersionedComponentStateLookup stateLookup) {
@@ -67,10 +69,16 @@ public abstract class AbstractComponentScheduler implements ComponentScheduler {
         enableNow(toEnable);
 
         Connectable connectable;
-        while ((connectable = toStart.poll()) != null) {
+        while ((connectable = connectablesToStart.poll()) != null) {
             logger.debug("{} starting {}", this, connectable);
             startNow(connectable);
         }
+
+        ReportingTaskNode taskNode;
+        while ((taskNode = reportingTasksToStart.poll()) != null) {
+            logger.debug("{} starting {}", this, taskNode);
+            startNow(taskNode);
+        }
     }
 
     private boolean isPaused() {
@@ -180,7 +188,7 @@ public abstract class AbstractComponentScheduler implements ComponentScheduler {
     public void startComponent(final Connectable component) {
         if (isPaused()) {
             logger.debug("{} called to start {} but paused so will queue it for start later", this, component);
-            toStart.offer(component);
+            connectablesToStart.offer(component);
         } else {
             logger.debug("{} starting {} now", this, component);
             startNow(component);
@@ -228,7 +236,19 @@ public abstract class AbstractComponentScheduler implements ComponentScheduler {
         return serviceProvider;
     }
 
+    public void startReportingTask(final ReportingTaskNode reportingTask) {
+        if (isPaused()) {
+            logger.debug("{} called to start {} but paused so will queue it for start later", this, reportingTask);
+            reportingTasksToStart.offer(reportingTask);
+        } else {
+            logger.debug("{} starting {} now", this, reportingTask);
+            startNow(reportingTask);
+        }
+    }
+
     protected abstract void startNow(Connectable component);
 
     protected abstract void enableNow(Collection<ControllerServiceNode> controllerServices);
+
+    protected abstract void startNow(ReportingTaskNode reportingTask);
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java
index 6a38dde93d..ef156830e3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.groups;
 
 import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.flow.ScheduledState;
 
@@ -34,6 +35,8 @@ public interface ComponentScheduler {
 
     void disableControllerServicesAsync(Collection<ControllerServiceNode> controllerServices);
 
+    void startReportingTask(ReportingTaskNode reportingTask);
+
     void pause();
 
     void resume();
@@ -60,6 +63,9 @@ public interface ComponentScheduler {
         public void disableControllerServicesAsync(final Collection<ControllerServiceNode> controllerServices) {
         }
 
+        public void startReportingTask(final ReportingTaskNode reportingTask) {
+        }
+
         @Override
         public void pause() {
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/GroupSynchronizationOptions.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java
similarity index 79%
rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/GroupSynchronizationOptions.java
rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java
index 661b986fd7..bc7ebb0a58 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/GroupSynchronizationOptions.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java
@@ -17,7 +17,9 @@
 
 package org.apache.nifi.groups;
 
-public class GroupSynchronizationOptions {
+import java.time.Duration;
+
+public class FlowSynchronizationOptions {
     private final ComponentIdGenerator componentIdGenerator;
     private final ComponentScheduler componentScheduler;
     private final PropertyDecryptor propertyDecryptor;
@@ -27,8 +29,10 @@ public class GroupSynchronizationOptions {
     private final boolean updateGroupVersionControlSnapshot;
     private final boolean updateExistingVariables;
     private final boolean updateRpgUrls;
+    private final Duration componentStopTimeout;
+    private final ComponentStopTimeoutAction timeoutAction;
 
-    private GroupSynchronizationOptions(final Builder builder) {
+    private FlowSynchronizationOptions(final Builder builder) {
         this.componentIdGenerator = builder.componentIdGenerator;
         this.componentScheduler = builder.componentScheduler;
         this.propertyDecryptor = builder.propertyDecryptor;
@@ -38,6 +42,8 @@ public class GroupSynchronizationOptions {
         this.updateGroupVersionControlSnapshot = builder.updateGroupVersionControlSnapshot;
         this.updateExistingVariables = builder.updateExistingVariables;
         this.updateRpgUrls = builder.updateRpgUrls;
+        this.componentStopTimeout = builder.componentStopTimeout;
+        this.timeoutAction = builder.timeoutAction;
     }
 
     public ComponentIdGenerator getComponentIdGenerator() {
@@ -76,6 +82,13 @@ public class GroupSynchronizationOptions {
         return propertyDecryptor;
     }
 
+    public Duration getComponentStopTimeout() {
+        return componentStopTimeout;
+    }
+
+    public ComponentStopTimeoutAction getComponentStopTimeoutAction() {
+        return timeoutAction;
+    }
 
     public static class Builder {
         private ComponentIdGenerator componentIdGenerator;
@@ -87,6 +100,9 @@ public class GroupSynchronizationOptions {
         private boolean updateExistingVariables = false;
         private boolean updateRpgUrls = false;
         private PropertyDecryptor propertyDecryptor = value -> value;
+        private Duration componentStopTimeout = Duration.ofSeconds(30);
+        private ComponentStopTimeoutAction timeoutAction = ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION;
+
 
         /**
          * Specifies the Component ID Generator to use for generating UUID's of components that are to be added to a ProcessGroup
@@ -190,8 +206,28 @@ public class GroupSynchronizationOptions {
             return this;
         }
 
+        /**
+         * When stopping or disabling a component, specifies how long to wait for the component to be fully stopped/disabled
+         * @param duration the duration to wait when stopping or disabling a component
+         * @return the builder
+         */
+        public Builder componentStopTimeout(final Duration duration) {
+            this.componentStopTimeout = duration;
+            return this;
+        }
+
+        /**
+         * If the component doesn't stop/disable in time, specifies what action should be taken
+         * @param action the action to take
+         * @return the builder
+         */
+        public Builder componentStopTimeoutAction(final ComponentStopTimeoutAction action) {
+            this.timeoutAction = action;
+            return this;
+        }
+
 
-        public GroupSynchronizationOptions build() {
+        public FlowSynchronizationOptions build() {
             if (componentIdGenerator == null) {
                 throw new IllegalStateException("Must set Component ID Generator");
             }
@@ -199,10 +235,10 @@ public class GroupSynchronizationOptions {
                 throw new IllegalStateException("Must set Component Scheduler");
             }
 
-            return new GroupSynchronizationOptions(this);
+            return new FlowSynchronizationOptions(this);
         }
 
-        public static Builder from(final GroupSynchronizationOptions options) {
+        public static Builder from(final FlowSynchronizationOptions options) {
             final Builder builder = new Builder();
             builder.componentIdGenerator = options.getComponentIdGenerator();
             builder.componentScheduler = options.getComponentScheduler();
@@ -213,8 +249,23 @@ public class GroupSynchronizationOptions {
             builder.updateExistingVariables = options.isUpdateExistingVariables();
             builder.updateRpgUrls = options.isUpdateRpgUrls();
             builder.propertyDecryptor = options.getPropertyDecryptor();
+            builder.componentStopTimeout = options.getComponentStopTimeout();
+            builder.timeoutAction = options.getComponentStopTimeoutAction();
 
             return builder;
         }
     }
+
+    public enum ComponentStopTimeoutAction {
+        /**
+         * If the timeout occurs, a {@link java.util.concurrent.TimeoutException TimeoutException} should be thrown
+         */
+        THROW_TIMEOUT_EXCEPTION,
+
+        /**
+         * If a timeout occurs when stopping a processor, the Processor should be terminated and no Exception should be thrown.
+         * If a Controller Service or Reporting Task fails to stop/disable in time, a {@link java.util.concurrent.TimeoutException} will still be thrown.
+         */
+        TERMINATE;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index 5d253c5f55..7579b721a2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -889,7 +889,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
      * @param synchronizationOptions options for how the synchronization should occur
      * @param flowMappingOptions options for how to map the existing dataflow into Versioned components so that it can be compared to the proposed snapshot
      */
-    void synchronizeFlow(VersionedExternalFlow proposedSnapshot, GroupSynchronizationOptions synchronizationOptions, FlowMappingOptions flowMappingOptions);
+    void synchronizeFlow(VersionedExternalFlow proposedSnapshot, FlowSynchronizationOptions synchronizationOptions, FlowMappingOptions flowMappingOptions);
 
     /**
      * Verifies a template with the specified name can be created.
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
index 18f9b980c5..c9b4da9420 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
@@ -35,12 +35,9 @@ import org.apache.nifi.parameter.ParameterLookup;
 import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.ReportingTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class StandardReportingTaskNode extends AbstractReportingTaskNode implements ReportingTaskNode {
 
-    private static final Logger logger = LoggerFactory.getLogger(StandardReportingTaskNode.class);
     private final FlowController flowController;
 
     public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller,
@@ -94,4 +91,26 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
     protected ParameterContext getParameterContext() {
         return null;
     }
+
+    @Override
+    public void start() {
+        verifyCanStart();
+        flowController.startReportingTask(this);
+    }
+
+    @Override
+    public void stop() {
+        verifyCanStop();
+        flowController.stopReportingTask(this);
+    }
+
+    public void enable() {
+        verifyCanEnable();
+        flowController.enableReportingTask(this);
+    }
+
+    public void disable() {
+        verifyCanDisable();
+        flowController.disableReportingTask(this);
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index a7dc11885d..d5d136b176 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -269,10 +269,10 @@ public final class StandardProcessScheduler implements ProcessScheduler {
     }
 
     @Override
-    public void unschedule(final ReportingTaskNode taskNode) {
+    public Future<Void> unschedule(final ReportingTaskNode taskNode) {
         final LifecycleState lifecycleState = getLifecycleState(requireNonNull(taskNode), false);
         if (!lifecycleState.isScheduled()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         taskNode.verifyCanStop();
@@ -280,6 +280,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         final ReportingTask reportingTask = taskNode.getReportingTask();
         taskNode.setScheduledState(ScheduledState.STOPPED);
 
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+
         final Runnable unscheduleReportingTaskRunnable = new Runnable() {
             @Override
             public void run() {
@@ -304,12 +306,14 @@ public final class StandardProcessScheduler implements ProcessScheduler {
 
                     if (lifecycleState.getActiveThreadCount() == 0 && lifecycleState.mustCallOnStoppedMethods()) {
                         ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, configurationContext);
+                        future.complete(null);
                     }
                 }
             }
         };
 
         componentLifeCycleThreadPool.execute(unscheduleReportingTaskRunnable);
+        return future;
     }
 
     /**
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 1dbf3b5b38..5d2011fd57 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -62,7 +62,7 @@ import org.apache.nifi.groups.AbstractComponentScheduler;
 import org.apache.nifi.groups.BundleUpdateStrategy;
 import org.apache.nifi.groups.ComponentIdGenerator;
 import org.apache.nifi.groups.ComponentScheduler;
-import org.apache.nifi.groups.GroupSynchronizationOptions;
+import org.apache.nifi.groups.FlowSynchronizationOptions;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.parameter.Parameter;
@@ -332,7 +332,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
                 allTemplates.forEach(template -> template.getProcessGroup().removeTemplate(template));
 
                 // Synchronize the root group
-                final GroupSynchronizationOptions syncOptions = new GroupSynchronizationOptions.Builder()
+                final FlowSynchronizationOptions syncOptions = new FlowSynchronizationOptions.Builder()
                     .componentIdGenerator(componentIdGenerator)
                     .componentScheduler(componentScheduler)
                     .ignoreLocalModifications(true)
@@ -1021,5 +1021,9 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
         protected void enableNow(final Collection<ControllerServiceNode> controllerServices) {
             flowController.getControllerServiceProvider().enableControllerServices(controllerServices);
         }
+
+        protected void startNow(final ReportingTaskNode reportingTask) {
+            flowController.startReportingTask(reportingTask);
+        }
     }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index 2a2e641bcf..35657d0d3f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -39,7 +39,7 @@ import org.apache.nifi.groups.DataValve;
 import org.apache.nifi.groups.FlowFileConcurrency;
 import org.apache.nifi.groups.FlowFileGate;
 import org.apache.nifi.groups.FlowFileOutboundPolicy;
-import org.apache.nifi.groups.GroupSynchronizationOptions;
+import org.apache.nifi.groups.FlowSynchronizationOptions;
 import org.apache.nifi.groups.NoOpBatchCounts;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.ProcessGroupCounts;
@@ -719,7 +719,7 @@ public class MockProcessGroup implements ProcessGroup {
     }
 
     @Override
-    public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
+    public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final FlowSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index 055d0251ef..3210347b22 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -243,7 +243,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
             if (ScheduledState.RUNNING.equals(scheduledState)) {
                 return serviceProvider.scheduleReferencingComponents(controllerService);
             } else {
-                return serviceProvider.unscheduleReferencingComponents(controllerService);
+                return serviceProvider.unscheduleReferencingComponents(controllerService).keySet();
             }
         }
 
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
index 5a24f371f7..fdcc874420 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
@@ -56,6 +56,24 @@ public class StatelessReportingTaskNode extends AbstractReportingTaskNode implem
         return new StatelessReportingContext(statelessEngine, flowManager, getEffectivePropertyValues(), getReportingTask(), getVariableRegistry(), getParameterLookup());
     }
 
+    @Override
+    public void start() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void stop() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void enable() {
+    }
+
+    @Override
+    public void disable() {
+    }
+
     @Override
     public Class<?> getComponentClass() {
         return null;
@@ -80,4 +98,6 @@ public class StatelessReportingTaskNode extends AbstractReportingTaskNode implem
     public Resource getResource() {
         return null;
     }
+
+
 }
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
index 5b32d2882d..b47b43a483 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
@@ -245,7 +245,8 @@ public class StatelessProcessScheduler implements ProcessScheduler {
     }
 
     @Override
-    public void unschedule(final ReportingTaskNode taskNode) {
+    public Future<Void> unschedule(final ReportingTaskNode taskNode) {
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override