You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2022/04/21 14:44:24 UTC
[nifi] branch main updated: NIFI-9940: Renamed StandardProcessGroupSynchronizer to StandardVersionedComponentSynchronizer. Added synching for processors, input/output ports, connections, etc. Added unit tests.
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5a3c7d1fbd NIFI-9940: Renamed StandardProcessGroupSynchronizer to StandardVersionedComponentSynchronizer. Added synching for processors, input/output ports, connections, etc. Added unit tests.
5a3c7d1fbd is described below
commit 5a3c7d1fbd78e78c79d206e34b6f30e25b108faf
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Mar 3 15:41:59 2022 -0500
NIFI-9940: Renamed StandardProcessGroupSynchronizer to StandardVersionedComponentSynchronizer. Added synching for processors, input/output ports, connections, etc. Added unit tests.
This closes #5983
---
.../nifi/controller/StandardProcessorNode.java | 24 +-
.../service/StandardControllerServiceProvider.java | 45 +-
.../FlowSynchronizationException.java | 32 +
.../StandardVersionedComponentSynchronizer.java} | 1445 ++++++++++++++++++--
.../VersionedComponentSynchronizer.java | 201 +++
.../VersionedFlowSynchronizationContext.java} | 12 +-
.../nifi/groups/DefaultComponentScheduler.java | 5 +
.../nifi/groups/ProcessGroupSynchronizer.java | 36 -
.../RetainExistingStateComponentScheduler.java | 6 +
.../apache/nifi/groups/StandardProcessGroup.java | 20 +-
.../nifi/remote/StandardRemoteProcessGroup.java | 2 +-
...StandardVersionedComponentSynchronizerTest.java | 1025 ++++++++++++++
.../apache/nifi/controller/ProcessScheduler.java | 16 +-
.../apache/nifi/controller/ReportingTaskNode.java | 7 +
.../service/ControllerServiceProvider.java | 10 +-
.../nifi/groups/AbstractComponentScheduler.java | 26 +-
.../org/apache/nifi/groups/ComponentScheduler.java | 6 +
...ptions.java => FlowSynchronizationOptions.java} | 61 +-
.../java/org/apache/nifi/groups/ProcessGroup.java | 2 +-
.../reporting/StandardReportingTaskNode.java | 25 +-
.../scheduling/StandardProcessScheduler.java | 8 +-
.../serialization/VersionedFlowSynchronizer.java | 8 +-
.../controller/service/mock/MockProcessGroup.java | 4 +-
.../web/dao/impl/StandardControllerServiceDAO.java | 2 +-
.../reporting/StatelessReportingTaskNode.java | 20 +
.../scheduling/StatelessProcessScheduler.java | 3 +-
26 files changed, 2842 insertions(+), 209 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index f743ac530b..de860a7b95 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -185,12 +185,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
this.processorRef = new AtomicReference<>(processorDetails);
identifier = new AtomicReference<>(uuid);
- destinations = new HashMap<>();
- connections = new HashMap<>();
+ destinations = new ConcurrentHashMap<>();
+ connections = new ConcurrentHashMap<>();
incomingConnections = new AtomicReference<>(new ArrayList<>());
lossTolerant = new AtomicBoolean(false);
- final Set<Relationship> emptySetOfRelationships = new HashSet<>();
- undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships);
+ undefinedRelationshipsToTerminate = new AtomicReference<>(Collections.emptySet());
comments = new AtomicReference<>("");
schedulingPeriod = new AtomicReference<>("0 sec");
schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS);
@@ -405,11 +404,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public boolean isAutoTerminated(final Relationship relationship) {
- if (relationship.isAutoTerminated() && getConnections(relationship).isEmpty()) {
- return true;
- }
- final Set<Relationship> terminatable = undefinedRelationshipsToTerminate.get();
- return terminatable == null ? false : terminatable.contains(relationship);
+ final boolean markedAutoTerminate = relationship.isAutoTerminated() || undefinedRelationshipsToTerminate.get().contains(relationship);
+ return markedAutoTerminate && getConnections(relationship).isEmpty();
}
@Override
@@ -418,13 +414,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
- for (final Relationship rel : terminate) {
- if (!getConnections(rel).isEmpty()) {
- throw new IllegalStateException("Cannot mark relationship '" + rel.getName()
- + "' as auto-terminated because Connection already exists with this relationship");
- }
- }
-
undefinedRelationshipsToTerminate.set(new HashSet<>(terminate));
LOG.debug("Resetting Validation State of {} due to setting auto-terminated relationships", this);
resetValidationState();
@@ -726,8 +715,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public Set<Connection> getConnections(final Relationship relationship) {
final Set<Connection> applicableConnections = connections.get(relationship);
- return (applicableConnections == null) ? Collections.<Connection> emptySet()
- : Collections.unmodifiableSet(applicableConnections);
+ return (applicableConnections == null) ? Collections.emptySet() : Collections.unmodifiableSet(applicableConnections);
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index a8e032a73a..3e7cfdf5aa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -25,9 +25,12 @@ import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.groups.ComponentScheduler;
+import org.apache.nifi.groups.DefaultComponentScheduler;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
@@ -36,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -96,37 +100,54 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public Set<ComponentNode> scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ return scheduleReferencingComponents(serviceNode, null, new DefaultComponentScheduler(this, VersionedComponentStateLookup.IDENTITY_LOOKUP));
+ }
+
+ public Set<ComponentNode> scheduleReferencingComponents(final ControllerServiceNode serviceNode, final Set<ComponentNode> candidates, final ComponentScheduler componentScheduler) {
// find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
// or a service that references this controller service, etc.
final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class);
- final Set<ComponentNode> updated = new HashSet<>();
-
// verify that we can start all components (that are not disabled) before doing anything
for (final ProcessorNode node : processors) {
+ if (candidates != null && !candidates.contains(node)) {
+ continue;
+ }
+
if (node.getScheduledState() != ScheduledState.DISABLED) {
node.verifyCanStart();
- updated.add(node);
}
}
for (final ReportingTaskNode node : reportingTasks) {
+ if (candidates != null && !candidates.contains(node)) {
+ continue;
+ }
+
if (node.getScheduledState() != ScheduledState.DISABLED) {
node.verifyCanStart();
- updated.add(node);
}
}
// start all of the components that are not disabled
+ final Set<ComponentNode> updated = new HashSet<>();
for (final ProcessorNode node : processors) {
+ if (candidates != null && !candidates.contains(node)) {
+ continue;
+ }
+
if (node.getScheduledState() != ScheduledState.DISABLED) {
- node.getProcessGroup().startProcessor(node, true);
+ componentScheduler.startComponent(node);
updated.add(node);
}
}
for (final ReportingTaskNode node : reportingTasks) {
+ if (candidates != null && !candidates.contains(node)) {
+ continue;
+ }
+
if (node.getScheduledState() != ScheduledState.DISABLED) {
- processScheduler.schedule(node);
+ componentScheduler.startReportingTask(node);
updated.add(node);
}
}
@@ -135,13 +156,13 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
@Override
- public Set<ComponentNode> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ public Map<ComponentNode, Future<Void>> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
// find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
// or a service that references this controller service, etc.
final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class);
- final Set<ComponentNode> updated = new HashSet<>();
+ final Map<ComponentNode, Future<Void>> updated = new HashMap<>();
// verify that we can stop all components (that are running) before doing anything
for (final ProcessorNode node : processors) {
@@ -158,14 +179,14 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
// stop all of the components that are running
for (final ProcessorNode node : processors) {
if (node.getScheduledState() == ScheduledState.RUNNING) {
- node.getProcessGroup().stopProcessor(node);
- updated.add(node);
+ final Future<Void> future = node.getProcessGroup().stopProcessor(node);
+ updated.put(node, future);
}
}
for (final ReportingTaskNode node : reportingTasks) {
if (node.getScheduledState() == ScheduledState.RUNNING) {
- processScheduler.unschedule(node);
- updated.add(node);
+ final Future<Void> future = processScheduler.unschedule(node);
+ updated.put(node, future);
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/FlowSynchronizationException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/FlowSynchronizationException.java
new file mode 100644
index 0000000000..974d15b1fe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/FlowSynchronizationException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.flow.synchronization;
+
+public class FlowSynchronizationException extends Exception {
+ public FlowSynchronizationException(final String message) {
+ super(message);
+ }
+
+ public FlowSynchronizationException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public FlowSynchronizationException(final Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
similarity index 63%
rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 8c5592dc18..d498d5fe38 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.groups;
+package org.apache.nifi.flow.synchronization;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bundle.BundleCoordinate;
@@ -31,6 +31,7 @@ import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.PropertyConfiguration;
+import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
@@ -39,6 +40,7 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.encrypt.EncryptionException;
import org.apache.nifi.flow.BatchSize;
@@ -52,19 +54,33 @@ import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.flow.VersionedFunnel;
import org.apache.nifi.flow.VersionedLabel;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
-import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.flow.VersionedRemoteGroupPort;
import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.groups.ComponentIdGenerator;
+import org.apache.nifi.groups.FlowFileConcurrency;
+import org.apache.nifi.groups.FlowFileOutboundPolicy;
+import org.apache.nifi.groups.FlowSynchronizationOptions;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.PropertyDecryptor;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
+import org.apache.nifi.groups.StandardVersionedFlowStatus;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.ParameterDescriptor;
+import org.apache.nifi.parameter.ParameterReferenceManager;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.FlowRegistry;
@@ -72,8 +88,6 @@ import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
-import org.apache.nifi.flow.VersionedParameter;
-import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.FlowComparator;
@@ -97,6 +111,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -107,25 +122,29 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
+import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
-public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronizer {
- private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroupSynchronizer.class);
+public class StandardVersionedComponentSynchronizer implements VersionedComponentSynchronizer {
+ private static final Logger LOG = LoggerFactory.getLogger(StandardVersionedComponentSynchronizer.class);
private static final String TEMP_FUNNEL_ID_SUFFIX = "-temp-funnel";
public static final String ENC_PREFIX = "enc{";
public static final String ENC_SUFFIX = "}";
- private final ProcessGroupSynchronizationContext context;
+ private final VersionedFlowSynchronizationContext context;
private final Set<String> updatedVersionedComponentIds = new HashSet<>();
private Set<String> preExistingVariables = new HashSet<>();
- private GroupSynchronizationOptions syncOptions;
+ private FlowSynchronizationOptions syncOptions;
- public StandardProcessGroupSynchronizer(final ProcessGroupSynchronizationContext context) {
+ public StandardVersionedComponentSynchronizer(final VersionedFlowSynchronizationContext context) {
this.context = context;
}
@@ -138,13 +157,12 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
this.updatedVersionedComponentIds.addAll(updatedVersionedComponentIds);
}
- public void setSynchronizationOptions(final GroupSynchronizationOptions syncOptions) {
+ public void setSynchronizationOptions(final FlowSynchronizationOptions syncOptions) {
this.syncOptions = syncOptions;
}
@Override
- public void synchronize(final ProcessGroup group, final VersionedExternalFlow versionedExternalFlow, final GroupSynchronizationOptions options) {
-
+ public void synchronize(final ProcessGroup group, final VersionedExternalFlow versionedExternalFlow, final FlowSynchronizationOptions options) {
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(context.getExtensionManager(), context.getFlowMappingOptions());
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(group, context.getControllerServiceProvider(), context.getFlowRegistryClient(), true);
@@ -228,7 +246,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts)
- throws ProcessorInstantiationException {
+ throws ProcessorInstantiationException {
// Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we
// transition the service into the RUNNING state, and then we need to update a Connection that is connected to it,
@@ -322,9 +340,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
// 10. Update connections to match those in the proposed group
// 11. Delete the temporary destination that was created above
- // Keep track of any processors that have been updated to have auto-terminated relationships so that we can set those
- // auto-terminated relationships after we've handled creating/deleting necessary connections.
- final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships = new HashMap<>();
// During the flow update, we will use temporary names for process group ports. This is because port names must be
// unique within a process group, but during an update we might temporarily be in a state where two ports have the same name.
@@ -378,7 +393,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
synchronizeInputPorts(group, proposed, proposedPortFinalNames, inputPortsByVersionedId);
synchronizeOutputPorts(group, proposed, proposedPortFinalNames, outputPortsByVersionedId);
synchronizeLabels(group, proposed, labelsByVersionedId);
- synchronizeProcessors(group, proposed, autoTerminatedRelationships, processorsByVersionedId);
+ synchronizeProcessors(group, proposed, processorsByVersionedId);
synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
} finally {
// Make sure that we reset the connections
@@ -389,12 +404,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
// We can now add in any necessary connections, since all connectable components have now been created.
synchronizeConnections(group, proposed, connectionsByVersionedId);
- // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
- // We cannot do this above, in the 'updateProcessor' call because if a connection is removed and changed to auto-terminated,
- // then updating this in the updateProcessor call above would attempt to set the Relationship to being auto-terminated while a
- // Connection for that relationship exists. This will throw an Exception.
- autoTerminatedRelationships.forEach(ProcessorNode::setAutoTerminatedRelationships);
-
// All ports have now been added/removed as necessary. We can now resolve the port names.
updatePortsToFinalNames(proposedPortFinalNames);
@@ -428,10 +437,10 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
LOG.info("Added {} to {}", added, group);
} else if (childCoordinates == null || syncOptions.isUpdateDescendantVersionedFlows()) {
- final StandardProcessGroupSynchronizer sync = new StandardProcessGroupSynchronizer(context);
+ final StandardVersionedComponentSynchronizer sync = new StandardVersionedComponentSynchronizer(context);
sync.setPreExistingVariables(preExistingVariables);
sync.setUpdatedVersionedComponentIds(updatedVersionedComponentIds);
- final GroupSynchronizationOptions options = GroupSynchronizationOptions.Builder.from(syncOptions)
+ final FlowSynchronizationOptions options = FlowSynchronizationOptions.Builder.from(syncOptions)
.updateGroupSettings(true)
.build();
@@ -456,8 +465,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
for (final VersionedControllerService proposedService : proposed.getControllerServices()) {
ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier());
if (service == null) {
- service = addControllerService(group, proposedService.getIdentifier(), proposedService.getInstanceIdentifier(),
- proposedService.getType(), proposedService.getBundle(), context.getComponentIdGenerator());
+ service = addControllerService(group, proposedService, context.getComponentIdGenerator());
LOG.info("Added {} to {}", service, group);
servicesAdded.put(proposedService.getIdentifier(), service);
@@ -520,7 +528,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
final Connection connection = connectionsByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", connection, group);
group.removeConnection(connection);
- context.getFlowManager().onConnectionRemoved(connection);
}
}
@@ -634,7 +641,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
}
-
private <T> Map<String, T> componentsById(final ProcessGroup group, final Function<ProcessGroup, Collection<T>> retrieveComponents,
final Function<T, String> retrieveId, final Function<T, Optional<String>> retrieveVersionedComponentId) {
@@ -643,7 +649,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
NiFiRegistryFlowMapper.generateVersionedComponentId(retrieveId.apply(component))), Function.identity()));
}
-
private void synchronizeFunnels(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Funnel> funnelsByVersionedId) {
for (final VersionedFunnel proposedFunnel : proposed.getFunnels()) {
final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier());
@@ -661,7 +666,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
private void synchronizeInputPorts(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<Port, String> proposedPortFinalNames,
- final Map<String, Port> inputPortsByVersionedId) {
+ final Map<String, Port> inputPortsByVersionedId) {
for (final VersionedPort proposedPort : proposed.getInputPorts()) {
final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier());
if (port == null) {
@@ -803,7 +808,8 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
private <C, V extends VersionedComponent> void removeMissingComponents(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, C> componentsById,
- final Function<VersionedProcessGroup, Collection<V>> getVersionedComponents, final BiConsumer<ProcessGroup, C> removeComponent) {
+ final Function<VersionedProcessGroup, Collection<V>> getVersionedComponents,
+ final BiConsumer<ProcessGroup, C> removeComponent) {
// Determine the ID's of the components to remove. To do this, we get the ID's of all components in the Process Group,
// and then remove from that the ID's of the components in the proposed group. That leaves us with the ID's of components
@@ -821,33 +827,16 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
}
+ private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ProcessorNode> processorsByVersionedId)
+ throws ProcessorInstantiationException {
- private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships,
- final Map<String, ProcessorNode> processorsByVersionedId) throws ProcessorInstantiationException {
for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
if (processor == null) {
final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator());
- context.getFlowManager().onProcessorAdded(added);
-
- final Set<Relationship> proposedAutoTerminated =
- proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream()
- .map(added::getRelationship)
- .collect(Collectors.toSet());
- autoTerminatedRelationships.put(added, proposedAutoTerminated);
LOG.info("Added {} to {}", added, group);
} else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
updateProcessor(processor, proposedProcessor);
-
- final Set<Relationship> proposedAutoTerminated =
- proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream()
- .map(processor::getRelationship)
- .collect(Collectors.toSet());
-
- if (!processor.getAutoTerminatedRelationships().equals(proposedAutoTerminated)) {
- autoTerminatedRelationships.put(processor, proposedAutoTerminated);
- }
-
LOG.info("Updated {}", processor);
} else {
processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
@@ -855,6 +844,18 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
}
+ private Set<Relationship> getAutoTerminatedRelationships(final ProcessorNode processor, final VersionedProcessor proposedProcessor) {
+ final Set<String> relationshipNames = proposedProcessor.getAutoTerminatedRelationships();
+ if (relationshipNames == null) {
+ return Collections.emptySet();
+ }
+
+ return relationshipNames.stream()
+ .map(processor::getRelationship)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ }
+
private void synchronizeRemoteGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, RemoteProcessGroup> rpgsByVersionedId) {
for (final VersionedRemoteProcessGroup proposedRpg : proposed.getRemoteProcessGroups()) {
final RemoteProcessGroup rpg = rpgsByVersionedId.get(proposedRpg.getIdentifier());
@@ -1013,11 +1014,11 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
destination.addProcessGroup(group);
- final StandardProcessGroupSynchronizer sync = new StandardProcessGroupSynchronizer(context);
+ final StandardVersionedComponentSynchronizer sync = new StandardVersionedComponentSynchronizer(context);
sync.setPreExistingVariables(variablesToSkip);
sync.setUpdatedVersionedComponentIds(updatedVersionedComponentIds);
- final GroupSynchronizationOptions options = GroupSynchronizationOptions.Builder.from(syncOptions)
+ final FlowSynchronizationOptions options = FlowSynchronizationOptions.Builder.from(syncOptions)
.updateGroupSettings(true)
.build();
sync.setSynchronizationOptions(options);
@@ -1026,23 +1027,125 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
return group;
}
- private ControllerServiceNode addControllerService(final ProcessGroup destination, final String versionedId, final String instanceId, final String type, final Bundle bundle,
- final ComponentIdGenerator componentIdGenerator) {
- final String id = componentIdGenerator.generateUuid(versionedId, instanceId, destination.getIdentifier());
- LOG.debug("Adding Controller Service with ID {} of type {}", id, type);
+ private ControllerServiceNode addControllerService(final ProcessGroup destination, final VersionedControllerService proposed, final ComponentIdGenerator componentIdGenerator) {
+ final String destinationId = destination == null ? "Controller" : destination.getIdentifier();
+ final String identifier = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destinationId);
+ LOG.debug("Adding Controller Service with ID {} of type {}", identifier, proposed.getType());
- final BundleCoordinate coordinate = toCoordinate(bundle);
- final boolean firstTimeAdded = true;
+ final BundleCoordinate coordinate = toCoordinate(proposed.getBundle());
final Set<URL> additionalUrls = Collections.emptySet();
+ final ControllerServiceNode newService = context.getFlowManager().createControllerService(proposed.getType(), identifier, coordinate, additionalUrls, true, true, null);
+ newService.setVersionedComponentId(proposed.getIdentifier());
- final ControllerServiceNode newService = context.getFlowManager().createControllerService(type, id, coordinate, additionalUrls, firstTimeAdded, true, null);
- newService.setVersionedComponentId(versionedId);
-
- destination.addControllerService(newService);
+ if (destination == null) {
+ context.getFlowManager().addRootControllerService(newService);
+ } else {
+ destination.addControllerService(newService);
+ }
return newService;
}
+ private void verifyCanSynchronize(final ControllerServiceNode controllerService, final VersionedControllerService proposed) {
+ // If service is null, we can always synchronize by creating the proposed service.
+ if (controllerService == null) {
+ return;
+ }
+
+ // Ensure that service is in a state that it can be removed.
+ if (proposed == null) {
+ controllerService.verifyCanDelete();
+ return;
+ }
+
+ // Verify service can be updated
+ controllerService.verifyCanUpdate();
+ }
+
+ @Override
+ public void synchronize(final ControllerServiceNode controllerService, final VersionedControllerService proposed, final ProcessGroup group,
+ final FlowSynchronizationOptions synchronizationOptions) throws FlowSynchronizationException, TimeoutException, InterruptedException {
+ if (controllerService == null && proposed == null) {
+ return;
+ }
+
+ setSynchronizationOptions(synchronizationOptions);
+
+ final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+ final ControllerServiceProvider serviceProvider = context.getControllerServiceProvider();
+
+ synchronizationOptions.getComponentScheduler().pause();
+ try {
+ // Disable the controller service, if necessary, in order to update it.
+ final Set<ComponentNode> referencesToRestart = new HashSet<>();
+ final Set<ControllerServiceNode> servicesToRestart = new HashSet<>();
+
+ try {
+ stopControllerService(controllerService, proposed, timeout, synchronizationOptions.getComponentStopTimeoutAction(), referencesToRestart, servicesToRestart);
+ verifyCanSynchronize(controllerService, proposed);
+
+ try {
+ if (proposed == null) {
+ serviceProvider.removeControllerService(controllerService);
+ LOG.info("Successfully synchronized {} by removing it from the flow", controllerService);
+ } else if (controllerService == null) {
+ final ControllerServiceNode added = addControllerService(group, proposed, synchronizationOptions.getComponentIdGenerator());
+
+ if (proposed.getScheduledState() == org.apache.nifi.flow.ScheduledState.ENABLED) {
+ servicesToRestart.add(added);
+ }
+
+ LOG.info("Successfully synchronized {} by adding it to the flow", added);
+ } else {
+ updateControllerService(controllerService, proposed);
+
+ if (proposed.getScheduledState() == org.apache.nifi.flow.ScheduledState.ENABLED) {
+ servicesToRestart.add(controllerService);
+ }
+
+ LOG.info("Successfully synchronized {} by updating it to match proposed version", controllerService);
+ }
+ } catch (final Exception e) {
+ throw new FlowSynchronizationException("Failed to synchronize Controller Service " + controllerService + " with proposed version", e);
+ }
+ } finally {
+ // Re-enable the controller service if necessary
+ serviceProvider.enableControllerServicesAsync(servicesToRestart);
+
+ // Restart any components that need to be restarted.
+ if (controllerService != null) {
+ serviceProvider.scheduleReferencingComponents(controllerService, referencesToRestart, context.getComponentScheduler());
+ }
+ }
+ } finally {
+ synchronizationOptions.getComponentScheduler().resume();
+ }
+ }
+
+ private void waitForStopCompletion(final Future<?> future, final Object component, final long timeout, final FlowSynchronizationOptions.ComponentStopTimeoutAction timeoutAction)
+ throws InterruptedException, FlowSynchronizationException, TimeoutException {
+ try {
+ future.get(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ throw new InterruptedException("Interrupted while waiting for " + component + " to stop/disable");
+ } catch (final ExecutionException ee) {
+ throw new FlowSynchronizationException("Failed to stop/disable " + component, ee.getCause());
+ } catch (final TimeoutException e) {
+ // On timeout, if action is to terminate and the component is a processor, terminate it.
+ if (component instanceof ProcessorNode) {
+ switch (timeoutAction) {
+ case THROW_TIMEOUT_EXCEPTION:
+ throw e;
+ case TERMINATE:
+ default:
+ ((ProcessorNode) component).terminate();
+ return;
+ }
+ }
+
+ throw new TimeoutException("Timed out waiting for " + component + " to stop/disable");
+ }
+ }
private void updateControllerService(final ControllerServiceNode service, final VersionedControllerService proposed) {
LOG.debug("Updating {}", service);
@@ -1053,7 +1156,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
service.setComments(proposed.getComments());
service.setName(proposed.getName());
- final Map<String, String> properties = populatePropertiesMap(service, proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup());
+ final Map<String, String> properties = populatePropertiesMap(service, proposed.getProperties(), service.getProcessGroup());
service.setProperties(properties, true);
if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) {
@@ -1067,8 +1170,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
}
- private Map<String, String> populatePropertiesMap(final ComponentNode componentNode, final Map<String, String> proposedProperties,
- final Map<String, VersionedPropertyDescriptor> proposedDescriptors, final ProcessGroup group) {
+ private Map<String, String> populatePropertiesMap(final ComponentNode componentNode, final Map<String, String> proposedProperties, final ProcessGroup group) {
// Explicitly set all existing properties to null, except for sensitive properties, so that if there isn't an entry in the proposedProperties
// it will get removed from the processor. We don't do this for sensitive properties because when we retrieve the VersionedProcessGroup from registry,
@@ -1088,11 +1190,10 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
.forEach(updatedPropertyNames::add);
for (final String propertyName : updatedPropertyNames) {
- final VersionedPropertyDescriptor descriptor = proposedDescriptors.get(propertyName);
+ final PropertyDescriptor descriptor = componentNode.getPropertyDescriptor(propertyName);
String value;
- if (descriptor != null && descriptor.getIdentifiesControllerService()) {
-
+ if (descriptor != null && descriptor.getControllerServiceDefinition() != null ) {
// Need to determine if the component's property descriptor for this service is already set to an id
// of an existing service that is outside the current processor group, and if it is we want to leave
// the property set to that value
@@ -1174,6 +1275,434 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
}
+ private void verifyCanSynchronize(final ParameterContext parameterContext, final VersionedParameterContext proposed) throws FlowSynchronizationException {
+ // Make sure that we have a unique name and add the Parameter Context if none exists
+ if (parameterContext == null) {
+ final ParameterContext existingContext = getParameterContextByName(proposed.getName());
+ if (existingContext != null) {
+ throw new FlowSynchronizationException("Cannot synchronize flow with proposed Parameter Context because a Parameter Context already exists with the name " + proposed.getName());
+ }
+ }
+
+ // If deleting, must ensure that no other parameter contexts inherit from this one.
+ if (proposed == null) {
+ verifyNotInherited(parameterContext.getIdentifier());
+ }
+
+ if (parameterContext != null && proposed != null) {
+ // Check that the parameters have appropriate sensitivity flag
+ for (final VersionedParameter versionedParameter : proposed.getParameters()) {
+ final Optional<Parameter> optionalParameter = parameterContext.getParameter(versionedParameter.getName());
+ if (optionalParameter.isPresent()) {
+ final boolean paramSensitive = optionalParameter.get().getDescriptor().isSensitive();
+ if (paramSensitive != versionedParameter.isSensitive()) {
+ throw new FlowSynchronizationException("Cannot synchronize flow with proposed Parameter Context because the Parameter [" + versionedParameter.getName() + "] in " +
+ parameterContext + " has a sensitivity flag of " + paramSensitive + " while the proposed version has a sensitivity flag of " + versionedParameter.isSensitive());
+ }
+ }
+ }
+
+ // Check that parameter contexts to inherit exist
+ final List<String> inheritedContexts = proposed.getInheritedParameterContexts();
+ if (inheritedContexts != null) {
+ for (final String contextName : inheritedContexts) {
+ final ParameterContext existing = getParameterContextByName(contextName);
+ if (existing == null) {
+ throw new FlowSynchronizationException("Cannot synchronize flow with proposed Parameter Context because proposed version inherits from Parameter Context with name " +
+ contextName + " but there is no Parameter Context with that name in the current flow");
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void synchronize(final ParameterContext parameterContext, final VersionedParameterContext proposed, final FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException {
+
+ if (parameterContext == null && proposed == null) {
+ return;
+ }
+
+ final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+ verifyCanSynchronize(parameterContext, proposed);
+
+ synchronizationOptions.getComponentScheduler().pause();
+ try {
+ // Make sure that we have a unique name and add the Parameter Context if none exists
+ if (parameterContext == null) {
+ final String contextId = synchronizationOptions.getComponentIdGenerator().generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), "Controller");
+ final ParameterContext added = createParameterContext(proposed, contextId, Collections.emptyMap());
+ LOG.info("Successfully synchronized {} by adding it to the flow", added);
+ return;
+ }
+
+ final ParameterReferenceManager referenceManager = parameterContext.getParameterReferenceManager();
+ final Set<String> updatedParameterNames = getUpdatedParameterNames(parameterContext, proposed);
+
+ final Set<ComponentNode> componentsToRestart = new HashSet<>();
+ final Set<ControllerServiceNode> servicesToRestart = new HashSet<>();
+ try {
+ // Stop components necessary
+ for (final String paramName : updatedParameterNames) {
+ final Set<ProcessorNode> processors = referenceManager.getProcessorsReferencing(parameterContext, paramName);
+ componentsToRestart.addAll(stopOrTerminate(processors, timeout, synchronizationOptions));
+
+ final Set<ControllerServiceNode> referencingServices = referenceManager.getControllerServicesReferencing(parameterContext, paramName);
+
+ for (final ControllerServiceNode referencingService : referencingServices) {
+ stopControllerService(referencingService, null, timeout, synchronizationOptions.getComponentStopTimeoutAction(), componentsToRestart, servicesToRestart);
+ servicesToRestart.add(referencingService);
+ }
+ }
+
+ // Remove or update parameter context.
+ final ParameterContextManager contextManager = context.getFlowManager().getParameterContextManager();
+ if (proposed == null) {
+ for (final ProcessGroup groupBound : referenceManager.getProcessGroupsBound(parameterContext)) {
+ groupBound.setParameterContext(null);
+ }
+
+ contextManager.removeParameterContext(parameterContext.getIdentifier());
+ LOG.info("Successfully synchronized {} by removing it from the flow", parameterContext);
+ } else {
+ final Map<String, Parameter> updatedParameters = createParameterMap(proposed.getParameters());
+
+ final Map<String, ParameterContext> contextsByName = contextManager.getParameterContextNameMapping();
+ final List<ParameterContext> inheritedContexts = new ArrayList<>();
+ final List<String> inheritedContextNames = proposed.getInheritedParameterContexts();
+ if (inheritedContextNames != null) {
+ for (final String inheritedContextName : inheritedContextNames) {
+ final ParameterContext inheritedContext = contextsByName.get(inheritedContextName);
+ inheritedContexts.add(inheritedContext);
+ }
+ }
+
+ parameterContext.setParameters(updatedParameters);
+ parameterContext.setName(proposed.getName());
+ parameterContext.setDescription(proposed.getDescription());
+ parameterContext.setInheritedParameterContexts(inheritedContexts);
+ LOG.info("Successfully synchronized {} by updating it to match the proposed version", parameterContext);
+ }
+ } finally {
+ // TODO: How will this behave if Controller Service was changed to DISABLING but then timed out waiting for it to disable?
+ // In that case, I think this will fail to enable the controller services, and as a result it will remain DISABLED.
+ // We probably want to update the logic here so that it marks a desired state of ENABLED and when the service finally transitions
+ // to DISABLED we enable it.
+ context.getControllerServiceProvider().enableControllerServicesAsync(servicesToRestart);
+
+ // We don't use ControllerServiceProvider.scheduleReferencingComponents here, as we do when dealing with a Controller Service
+ // because if we timeout while waiting for a Controller Service to stop, then that Controller Service won't be in our list of Controller Services
+ // to re-enable. As a result, we don't have the appropriate Controller Service to pass to the scheduleReferencingComponents.
+ for (final ComponentNode stoppedComponent : componentsToRestart) {
+ if (stoppedComponent instanceof Connectable) {
+ context.getComponentScheduler().startComponent((Connectable) stoppedComponent);
+ }
+ }
+ }
+ } finally {
+ synchronizationOptions.getComponentScheduler().resume();
+ }
+ }
+
+ protected Set<String> getUpdatedParameterNames(final ParameterContext parameterContext, final VersionedParameterContext proposed) {
+ final Map<String, String> originalValues = new HashMap<>();
+ parameterContext.getParameters().values().forEach(param -> originalValues.put(param.getDescriptor().getName(), param.getValue()));
+
+ final Map<String, String> proposedValues = new HashMap<>();
+ if (proposed != null) {
+ proposed.getParameters().forEach(versionedParam -> proposedValues.put(versionedParam.getName(), versionedParam.getValue()));
+ }
+
+ final Map<String, String> copyOfOriginalValues = new HashMap<>(originalValues);
+ proposedValues.forEach(originalValues::remove);
+ copyOfOriginalValues.forEach(proposedValues::remove);
+
+ final Set<String> updatedParameterNames = new HashSet<>(originalValues.keySet());
+ updatedParameterNames.addAll(proposedValues.keySet());
+
+ return updatedParameterNames;
+ }
+
+ @Override
+ public void synchronizeProcessGroupSettings(final ProcessGroup processGroup, final VersionedProcessGroup proposed, final ProcessGroup parentGroup,
+ final FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException {
+
+ if (processGroup == null && proposed == null) {
+ return;
+ }
+
+ final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+
+ synchronizationOptions.getComponentScheduler().pause();
+ try {
+ // Check if we need to delete the Process Group
+ if (proposed == null) {
+ // Ensure that there are no incoming connections
+ processGroup.getInputPorts().forEach(Port::verifyCanDelete);
+
+ // Bleed out the data by stopping all input ports and source processors, then waiting
+ // for all connections to become empty
+ bleedOut(processGroup, timeout, synchronizationOptions);
+
+ processGroup.stopProcessing();
+ waitFor(timeout, () -> isDoneProcessing(processGroup));
+
+ // Disable all Controller Services
+ final Future<Void> disableServicesFuture = context.getControllerServiceProvider().disableControllerServicesAsync(processGroup.findAllControllerServices());
+ try {
+ disableServicesFuture.get(timeout, TimeUnit.MILLISECONDS);
+ } catch (final ExecutionException ee) {
+ throw new FlowSynchronizationException("Could not synchronize flow with proposal due to: failed to disable Controller Services", ee.getCause());
+ }
+
+ // Remove all templates from the group and remove the group
+ processGroup.getTemplates().forEach(processGroup::removeTemplate);
+ processGroup.getParent().removeProcessGroup(processGroup);
+
+ LOG.info("Successfully synchronized {} by removing it from the flow", processGroup);
+ return;
+ }
+
+ // Create the Process Group if it doesn't exist
+ final ProcessGroup groupToUpdate;
+ if (processGroup == null) {
+ final String groupId = synchronizationOptions.getComponentIdGenerator().generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), parentGroup.getIdentifier());
+ final ProcessGroup group = context.getFlowManager().createProcessGroup(groupId);
+ group.setVersionedComponentId(proposed.getIdentifier());
+ group.setParent(parentGroup);
+ group.setName(proposed.getName());
+
+ parentGroup.addProcessGroup(group);
+ groupToUpdate = group;
+ } else {
+ groupToUpdate = processGroup;
+ }
+
+ // Ensure that the referenced Parameter Context is valid
+ final ParameterContext parameterContext = proposed.getParameterContextName() == null ? null :
+ context.getFlowManager().getParameterContextManager().getParameterContextNameMapping().get(proposed.getParameterContextName());
+
+ if (parameterContext == null && proposed.getParameterContextName() != null) {
+ throw new FlowSynchronizationException("Cannot synchronize flow with proposed version because proposal indicates that Process Group " + groupToUpdate +
+ " should use Parameter Context with name [" + proposed.getParameterContextName() + "] but no Parameter Context exists with that name");
+ }
+
+ // Determine which components must be stopped/disabled based on Parameter Context name changing
+ final Set<ProcessorNode> processorsToStop = new HashSet<>();
+ final Set<ControllerServiceNode> controllerServicesToStop = new HashSet<>();
+ final String currentParameterContextName = groupToUpdate.getParameterContext() == null ? null : groupToUpdate.getParameterContext().getName();
+ if (!Objects.equals(currentParameterContextName, proposed.getParameterContextName())) {
+ groupToUpdate.getProcessors().stream()
+ .filter(ProcessorNode::isRunning)
+ .filter(ProcessorNode::isReferencingParameter)
+ .forEach(processorsToStop::add);
+
+ final Set<ControllerServiceNode> servicesReferencingParams = groupToUpdate.getControllerServices(false).stream()
+ .filter(ControllerServiceNode::isReferencingParameter)
+ .collect(Collectors.toSet());
+
+ for (final ControllerServiceNode service : servicesReferencingParams) {
+ if (!service.isActive()) {
+ continue;
+ }
+
+ controllerServicesToStop.add(service);
+
+ for (final ControllerServiceNode referencingService : service.getReferences().findRecursiveReferences(ControllerServiceNode.class)) {
+ if (!referencingService.isActive()) {
+ continue;
+ }
+
+ controllerServicesToStop.add(referencingService);
+ }
+ }
+
+ for (final ControllerServiceNode service : controllerServicesToStop) {
+ service.getReferences().findRecursiveReferences(ProcessorNode.class).stream()
+ .filter(ProcessorNode::isRunning)
+ .forEach(processorsToStop::add);
+ }
+ }
+
+ // Determine which components must be stopped based on changes to Variable Registry.
+ final Set<String> updatedVariableNames = getUpdatedVariableNames(groupToUpdate.getVariableRegistry(), proposed.getVariables() == null ? Collections.emptyMap() : proposed.getVariables());
+ if (!updatedVariableNames.isEmpty()) {
+ for (final String variableName : updatedVariableNames) {
+ final Set<ComponentNode> affectedComponents = groupToUpdate.getComponentsAffectedByVariable(variableName);
+ for (final ComponentNode component : affectedComponents) {
+ if (component instanceof ProcessorNode) {
+ final ProcessorNode processor = (ProcessorNode) component;
+ if (processor.isRunning()) {
+ processorsToStop.add(processor);
+ }
+ } else if (component instanceof ControllerServiceNode) {
+ final ControllerServiceNode service = (ControllerServiceNode) component;
+ if (service.isActive()) {
+ controllerServicesToStop.add(service);
+ }
+ }
+ }
+ }
+ }
+
+ try {
+ // Stop all necessary running processors
+ stopOrTerminate(processorsToStop, timeout, synchronizationOptions);
+
+ // Stop all necessary enabled/active Controller Services
+ final Future<Void> serviceDisableFuture = context.getControllerServiceProvider().disableControllerServicesAsync(controllerServicesToStop);
+ try {
+ serviceDisableFuture.get(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new FlowSynchronizationException("Failed to disable Controller Services necessary in order to perform update of Process Group", e);
+ }
+
+ // Update the Process Group
+ groupToUpdate.setDefaultBackPressureDataSizeThreshold(proposed.getDefaultBackPressureDataSizeThreshold());
+ groupToUpdate.setDefaultBackPressureObjectThreshold(proposed.getDefaultBackPressureObjectThreshold());
+ groupToUpdate.setDefaultFlowFileExpiration(proposed.getDefaultFlowFileExpiration());
+ groupToUpdate.setFlowFileConcurrency(proposed.getFlowFileConcurrency() == null ? FlowFileConcurrency.UNBOUNDED : FlowFileConcurrency.valueOf(proposed.getFlowFileConcurrency()));
+ groupToUpdate.setFlowFileOutboundPolicy(proposed.getFlowFileOutboundPolicy() == null ? FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE :
+ FlowFileOutboundPolicy.valueOf(proposed.getFlowFileOutboundPolicy()));
+ groupToUpdate.setParameterContext(parameterContext);
+ groupToUpdate.setVariables(proposed.getVariables());
+ groupToUpdate.setComments(proposed.getComments());
+ groupToUpdate.setName(proposed.getName());
+ groupToUpdate.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
+
+ if (processGroup == null) {
+ LOG.info("Successfully synchronized {} by adding it to the flow", groupToUpdate);
+ } else {
+ LOG.info("Successfully synchronized {} by updating it to match proposed version", groupToUpdate);
+ }
+ } finally {
+ // Re-enable all Controller Services that we disabled and restart all processors
+ context.getControllerServiceProvider().enableControllerServicesAsync(controllerServicesToStop);
+
+ for (final ProcessorNode processor : processorsToStop) {
+ processor.getProcessGroup().startProcessor(processor, false);
+ }
+ }
+ } finally {
+ synchronizationOptions.getComponentScheduler().resume();
+ }
+ }
+
+ private boolean isDoneProcessing(final ProcessGroup group) {
+ for (final ProcessorNode processor : group.getProcessors()) {
+ if (processor.isRunning()) {
+ return false;
+ }
+ }
+
+ for (final Port port : group.getInputPorts()) {
+ if (port.isRunning()) {
+ return false;
+ }
+ }
+
+ for (final Port port : group.getOutputPorts()) {
+ if (port.isRunning()) {
+ return false;
+ }
+ }
+
+ for (final RemoteProcessGroup rpg : group.getRemoteProcessGroups()) {
+ for (final RemoteGroupPort port : rpg.getInputPorts()) {
+ if (port.isRunning()) {
+ return false;
+ }
+ }
+
+ for (final RemoteGroupPort port : rpg.getOutputPorts()) {
+ if (port.isRunning()) {
+ return false;
+ }
+ }
+ }
+
+ for (final ProcessGroup childGroup : group.getProcessGroups()) {
+ if (!isDoneProcessing(childGroup)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private void bleedOut(final ProcessGroup processGroup, final long timeout, final FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException {
+ processGroup.getInputPorts().forEach(processGroup::stopInputPort);
+
+ final Set<ProcessorNode> sourceProcessors = processGroup.findAllProcessors().stream()
+ .filter(this::isSourceProcessor)
+ .collect(Collectors.toSet());
+
+ stopOrTerminate(sourceProcessors, timeout, synchronizationOptions);
+
+ final List<Connection> connections = processGroup.findAllConnections();
+ waitFor(timeout, () -> connectionsEmpty(connections));
+ }
+
+ private void waitFor(final long timeout, final BooleanSupplier condition) throws InterruptedException {
+ while (System.currentTimeMillis() <= timeout && !condition.getAsBoolean()) {
+ Thread.sleep(10L);
+ }
+ }
+
+ private boolean connectionsEmpty(final Collection<Connection> connections) {
+ for (final Connection connection : connections) {
+ if (!connection.getFlowFileQueue().isEmpty()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private boolean isSourceProcessor(final ProcessorNode processor) {
+ return processor.getIncomingConnections().stream()
+ .anyMatch(connection -> connection.getSource() != processor);
+ }
+
+ private Set<String> getUpdatedVariableNames(final ComponentVariableRegistry variableRegistry, final Map<String, String> updatedVariables) {
+ final Set<String> updatedVariableNames = new HashSet<>();
+
+ final Map<String, String> currentVariables = new HashMap<>();
+ variableRegistry.getVariableMap().forEach((key, value) -> currentVariables.put(key.getName(), value));
+
+ // If there's any value in the updated variables that differs from the current variables, add the variable name to our Set
+ for (final Map.Entry<String, String> entry : updatedVariables.entrySet()) {
+ final String key = entry.getKey();
+ final String updatedValue = entry.getValue();
+ final String currentValue = currentVariables.get(key);
+
+ if (!Objects.equals(currentValue, updatedValue)) {
+ updatedVariableNames.add(key);
+ }
+ }
+
+ // For any variable that currently exists but doesn't exist in the updated variables, add it to our Set
+ for (final String key : currentVariables.keySet()) {
+ if (!updatedVariables.containsKey(key)) {
+ updatedVariableNames.add(key);
+ }
+ }
+
+ return updatedVariableNames;
+ }
+
+
+ private void verifyNotInherited(final String parameterContextId) {
+ for (final ParameterContext parameterContext : context.getFlowManager().getParameterContextManager().getParameterContexts()) {
+ if (parameterContext.getInheritedParameterContexts().stream().anyMatch(pc -> pc.getIdentifier().equals(parameterContextId))) {
+ throw new IllegalStateException(String.format("Cannot delete Parameter Context with ID [%s] because it is referenced by at least one Parameter Context [%s]",
+ parameterContextId, parameterContext.getIdentifier()));
+ }
+ }
+ }
private void updateParameterContext(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
final ComponentIdGenerator componentIdGenerator) {
@@ -1255,10 +1784,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
private ParameterContext getParameterContextByName(final String contextName) {
- return context.getFlowManager().getParameterContextManager().getParameterContexts().stream()
- .filter(context -> context.getName().equals(contextName))
- .findAny()
- .orElse(null);
+ return context.getFlowManager().getParameterContextManager().getParameterContextNameMapping().get(contextName);
}
private ParameterContext createParameterContextWithoutReferences(final VersionedParameterContext versionedParameterContext) {
@@ -1287,8 +1813,28 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
private ParameterContext createParameterContext(final VersionedParameterContext versionedParameterContext, final String parameterContextId,
final Map<String, VersionedParameterContext> versionedParameterContexts) {
+
+ final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext.getParameters());
+
+ final List<String> parameterContextRefs = new ArrayList<>();
+ if (versionedParameterContext.getInheritedParameterContexts() != null) {
+ versionedParameterContext.getInheritedParameterContexts().stream()
+ .map(name -> createParameterReferenceId(name, versionedParameterContexts))
+ .forEach(parameterContextRefs::add);
+ }
+
+ final AtomicReference<ParameterContext> contextReference = new AtomicReference<>();
+ context.getFlowManager().withParameterContextResolution(() -> {
+ final ParameterContext created = context.getFlowManager().createParameterContext(parameterContextId, versionedParameterContext.getName(), parameters, parameterContextRefs);
+ contextReference.set(created);
+ });
+
+ return contextReference.get();
+ }
+
+ private Map<String, Parameter> createParameterMap(final Collection<VersionedParameter> versionedParameters) {
final Map<String, Parameter> parameters = new HashMap<>();
- for (final VersionedParameter versionedParameter : versionedParameterContext.getParameters()) {
+ for (final VersionedParameter versionedParameter : versionedParameters) {
final ParameterDescriptor descriptor = new ParameterDescriptor.Builder()
.name(versionedParameter.getName())
.description(versionedParameter.getDescription())
@@ -1299,14 +1845,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
parameters.put(versionedParameter.getName(), parameter);
}
- final List<String> parameterContextRefs = new ArrayList<>();
- if (versionedParameterContext.getInheritedParameterContexts() != null) {
- versionedParameterContext.getInheritedParameterContexts().stream()
- .map(name -> createParameterReferenceId(name, versionedParameterContexts))
- .forEach(parameterContextRefs::add);
- }
-
- return context.getFlowManager().createParameterContext(parameterContextId, versionedParameterContext.getName(), parameters, parameterContextRefs);
+ return parameters;
}
private String createParameterReferenceId(final String parameterContextName, final Map<String, VersionedParameterContext> versionedParameterContexts) {
@@ -1330,7 +1869,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
return selectedParameterContext;
}
-
private void addMissingConfiguration(final VersionedParameterContext versionedParameterContext, final ParameterContext currentParameterContext,
final Map<String, VersionedParameterContext> versionedParameterContexts) {
final Map<String, Parameter> parameters = new HashMap<>();
@@ -1353,7 +1891,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
currentParameterContext.setParameters(parameters);
-
// If the current parameter context doesn't have any inherited param contexts but the versioned one does,
// add the versioned ones.
if (versionedParameterContext.getInheritedParameterContexts() != null && !versionedParameterContext.getInheritedParameterContexts().isEmpty()
@@ -1406,12 +1943,66 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
}
- private void updateFunnel(final Funnel funnel, final VersionedFunnel proposed) {
- funnel.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
- }
+ @Override
+ public void synchronize(final Funnel funnel, final VersionedFunnel proposed, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException {
- private Funnel addFunnel(final ProcessGroup destination, final VersionedFunnel proposed, final ComponentIdGenerator componentIdGenerator) {
- final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier());
+ if (funnel == null && proposed == null) {
+ return;
+ }
+
+ final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+
+ if (proposed == null) {
+ verifyCanDelete(funnel, timeout);
+ } else if (funnel != null) {
+ funnel.verifyCanUpdate();
+ }
+
+ final Set<Connectable> toRestart = new HashSet<>();
+ try {
+ if (proposed == null) {
+ final Set<Connectable> stoppedDownstream = stopDownstreamComponents(funnel, timeout, synchronizationOptions);
+ toRestart.addAll(stoppedDownstream);
+
+ funnel.getProcessGroup().removeFunnel(funnel);
+ LOG.info("Successfully synchronized {} by removing it from the flow", funnel);
+ } else if (funnel == null) {
+ final Funnel added = addFunnel(group, proposed, synchronizationOptions.getComponentIdGenerator());
+ LOG.info("Successfully synchronized {} by adding it to the flow", added);
+ } else {
+ updateFunnel(funnel, proposed);
+ LOG.info("Successfully synchronized {} by updating it to match proposed version", funnel);
+ }
+ } finally {
+ // Restart any components that need to be restarted.
+ for (final Connectable stoppedComponent : toRestart) {
+ context.getComponentScheduler().startComponent(stoppedComponent);
+ }
+ }
+ }
+
+ @Override
+ public void synchronize(final Label label, final VersionedLabel proposed, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions) {
+ if (label == null && proposed == null) {
+ return;
+ }
+
+ if (proposed == null) {
+ label.getProcessGroup().removeLabel(label);
+ } else if (label == null) {
+ addLabel(group, proposed, synchronizationOptions.getComponentIdGenerator());
+ } else {
+ updateLabel(label, proposed);
+ }
+ }
+
+ private void updateFunnel(final Funnel funnel, final VersionedFunnel proposed) {
+ funnel.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
+ }
+
+ private Funnel addFunnel(final ProcessGroup destination, final VersionedFunnel proposed, final ComponentIdGenerator componentIdGenerator) {
+ final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier());
final Funnel funnel = context.getFlowManager().createFunnel(id);
funnel.setVersionedComponentId(proposed.getIdentifier());
destination.addFunnel(funnel);
@@ -1445,6 +2036,82 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
port.setName(name);
}
+ private void verifyCanSynchronize(final Port port, final VersionedPort proposed, final long timeout) throws InterruptedException, TimeoutException, FlowSynchronizationException {
+ if (proposed == null) {
+ verifyCanDelete(port, timeout);
+ return;
+ }
+
+ final ComponentType proposedType = proposed.getComponentType();
+ if (proposedType != ComponentType.INPUT_PORT && proposedType != ComponentType.OUTPUT_PORT) {
+ throw new FlowSynchronizationException("Cannot synchronize port " + port + " with the proposed Port definition because its type is "
+ + proposedType + " and expected either an INPUT_PORT or an OUTPUT_PORT");
+ }
+ }
+
+ @Override
+ public void synchronize(final Port port, final VersionedPort proposed, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException {
+
+ if (port == null && proposed == null) {
+ return;
+ }
+
+ final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+ verifyCanSynchronize(port, proposed, timeout);
+
+ synchronizationOptions.getComponentScheduler().pause();
+ try {
+ final Set<Connectable> toRestart = new HashSet<>();
+ if (port != null) {
+ final boolean stopped = stopOrTerminate(port, timeout, synchronizationOptions);
+ if (stopped && proposed != null) {
+ toRestart.add(port);
+ }
+ }
+
+ try {
+ if (port == null) {
+ final ComponentType proposedType = proposed.getComponentType();
+
+ if (proposedType == ComponentType.INPUT_PORT) {
+ addInputPort(group, proposed, synchronizationOptions.getComponentIdGenerator(), proposed.getName());
+ } else {
+ addOutputPort(group, proposed, synchronizationOptions.getComponentIdGenerator(), proposed.getName());
+ }
+
+ LOG.info("Successfully synchronized {} by adding it to the flow", port);
+ } else if (proposed == null) {
+ final Set<Connectable> stoppedDownstream = stopDownstreamComponents(port, timeout, synchronizationOptions);
+ toRestart.addAll(stoppedDownstream);
+
+ verifyCanDelete(port, timeout);
+
+ switch (port.getConnectableType()) {
+ case INPUT_PORT:
+ port.getProcessGroup().removeInputPort(port);
+ break;
+ case OUTPUT_PORT:
+ port.getProcessGroup().removeOutputPort(port);
+ break;
+ }
+
+ LOG.info("Successfully synchronized {} by removing it from the flow", port);
+ } else {
+ updatePort(port, proposed, proposed.getName());
+ LOG.info("Successfully synchronized {} by updating it to match proposed version", port);
+ }
+ } finally {
+ // Restart any components that need to be restarted.
+ for (final Connectable stoppedComponent : toRestart) {
+ context.getComponentScheduler().startComponent(stoppedComponent);
+ }
+ }
+ } finally {
+ synchronizationOptions.getComponentScheduler().resume();
+ }
+ }
+
private void updatePort(final Port port, final VersionedPort proposed, final String temporaryName) {
final String name = temporaryName != null ? temporaryName : proposed.getName();
port.setComments(proposed.getComments());
@@ -1531,6 +2198,248 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
return procNode;
}
+ private void verifyCanSynchronize(final ProcessorNode processor, final VersionedProcessor proposedProcessor, final long timeout)
+ throws InterruptedException, TimeoutException, FlowSynchronizationException {
+
+ // If processor is null, we can always synchronize by creating the proposed processor.
+ if (processor == null) {
+ return;
+ }
+
+ // Ensure that processor is in a state that it can be removed.
+ if (proposedProcessor == null) {
+ verifyCanDelete(processor, timeout);
+ return;
+ }
+
+ // Verify processor can be updated
+ processor.verifyCanUpdate();
+ }
+
+ private void verifyCanDelete(final Connectable connectable, final long timeout) throws InterruptedException, TimeoutException, FlowSynchronizationException {
+ verifyNoIncomingConnections(connectable);
+ verifyCanDeleteConnections(connectable, timeout);
+ connectable.verifyCanDelete(true);
+ }
+
+ private void verifyCanDeleteConnections(final Connectable connectable, final long timeout) throws InterruptedException, TimeoutException, FlowSynchronizationException {
+ final Set<Connection> connections = connectable.getConnections();
+ for (final Connection connection : connections) {
+ verifyCanDeleteWhenQueueEmpty(connection);
+ }
+
+ for (final Connection connection : connections) {
+ waitForQueueEmpty(connection, Duration.ofMillis(timeout - System.currentTimeMillis()));
+ }
+ }
+
+ private void verifyNoIncomingConnections(final Connectable connectable) throws FlowSynchronizationException {
+ for (final Connection incoming : connectable.getIncomingConnections()) {
+ final Connectable source = incoming.getSource();
+ if (source == connectable) {
+ continue;
+ }
+
+ throw new FlowSynchronizationException("Cannot remove " + connectable + " because it has an incoming connection from " + incoming.getSource());
+ }
+ }
+
+ @Override
+ public void synchronize(final ProcessorNode processor, final VersionedProcessor proposedProcessor, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException {
+
+ if (processor == null && proposedProcessor == null) {
+ return;
+ }
+
+ setSynchronizationOptions(synchronizationOptions);
+ final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+
+ synchronizationOptions.getComponentScheduler().pause();
+ try {
+ // Stop the processor, if necessary, in order to update it.
+ final Set<Connectable> toRestart = new HashSet<>();
+ if (processor != null) {
+ final boolean stopped = stopOrTerminate(processor, timeout, synchronizationOptions);
+
+ if (stopped && proposedProcessor != null && proposedProcessor.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) {
+ toRestart.add(processor);
+ }
+ }
+
+ try {
+ verifyCanSynchronize(processor, proposedProcessor, timeout);
+
+ try {
+ if (proposedProcessor == null) {
+ final Set<Connectable> stoppedDownstream = stopDownstreamComponents(processor, timeout, synchronizationOptions);
+ toRestart.addAll(stoppedDownstream);
+
+ processor.getProcessGroup().removeProcessor(processor);
+ LOG.info("Successfully synchronized {} by removing it from the flow", processor);
+ } else if (processor == null) {
+ final ProcessorNode added = addProcessor(group, proposedProcessor, synchronizationOptions.getComponentIdGenerator());
+ LOG.info("Successfully synchronized {} by adding it to the flow", added);
+ } else {
+ updateProcessor(processor, proposedProcessor);
+ LOG.info("Successfully synchronized {} by updating it to match proposed version", processor);
+ }
+ } catch (final Exception e) {
+ throw new FlowSynchronizationException("Failed to synchronize processor " + processor + " with proposed version", e);
+ }
+ } finally {
+ // Restart any components that need to be restarted.
+ for (final Connectable stoppedComponent : toRestart) {
+ context.getComponentScheduler().startComponent(stoppedComponent);
+ }
+ }
+ } finally {
+ synchronizationOptions.getComponentScheduler().resume();
+ }
+ }
+
+ private Set<Connectable> stopDownstreamComponents(final Connectable component, final long timeout, final FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException {
+
+ final Set<Connectable> stoppedComponents = new HashSet<>();
+
+ for (final Connection connection : component.getConnections()) {
+ final Connectable destination = connection.getDestination();
+ final boolean stopped = stopOrTerminate(destination, timeout, synchronizationOptions);
+
+ if (stopped) {
+ stoppedComponents.add(destination);
+ }
+ }
+
+ return stoppedComponents;
+ }
+
+ private Set<Connectable> getDownstreamComponents(final Connectable component, final boolean includeSelf) {
+ final Set<Connectable> components = new HashSet<>();
+ if (includeSelf) {
+ components.add(component);
+ }
+
+ for (final Connection connection : component.getConnections()) {
+ components.add(connection.getDestination());
+ }
+
+ return components;
+ }
+
+ private <T extends Connectable> Set<T> stopOrTerminate(final Set<T> components, final long timeout, final FlowSynchronizationOptions synchronizationOptions)
+ throws TimeoutException, FlowSynchronizationException {
+
+ final Set<T> stoppedComponents = new HashSet<>();
+
+ for (final T component : components) {
+ final boolean stopped = stopOrTerminate(component, timeout, synchronizationOptions);
+ if (stopped) {
+ stoppedComponents.add(component);
+ }
+ }
+
+ return stoppedComponents;
+ }
+
+ private boolean stopOrTerminate(final Connectable component, final long timeout, final FlowSynchronizationOptions synchronizationOptions) throws TimeoutException, FlowSynchronizationException {
+ if (!component.isRunning()) {
+ return false;
+ }
+
+ final ConnectableType connectableType = component.getConnectableType();
+ switch (connectableType) {
+ case INPUT_PORT:
+ component.getProcessGroup().stopInputPort((Port) component);
+ return true;
+ case OUTPUT_PORT:
+ component.getProcessGroup().stopOutputPort((Port) component);
+ return true;
+ case PROCESSOR:
+ return stopOrTerminate((ProcessorNode) component, timeout, synchronizationOptions);
+ default:
+ return false;
+ }
+ }
+
+ private boolean stopOrTerminate(final ProcessorNode processor, final long timeout, final FlowSynchronizationOptions synchronizationOptions) throws TimeoutException, FlowSynchronizationException {
+ try {
+ LOG.debug("Stopping {} in order to synchronize it with proposed version", processor);
+ return stopProcessor(processor, timeout);
+ } catch (final TimeoutException te) {
+ switch (synchronizationOptions.getComponentStopTimeoutAction()) {
+ case THROW_TIMEOUT_EXCEPTION:
+ throw te;
+ case TERMINATE:
+ default:
+ processor.terminate();
+ return true;
+ }
+ }
+ }
+
+ private boolean stopProcessor(final ProcessorNode processor, final long timeout) throws FlowSynchronizationException, TimeoutException {
+ if (!processor.isRunning()) {
+ return false;
+ }
+
+ final Future<Void> future = processor.getProcessGroup().stopProcessor(processor);
+ try {
+ future.get(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ return true;
+ } catch (final ExecutionException ee) {
+ throw new FlowSynchronizationException("Failed to stop processor " + processor, ee.getCause());
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new FlowSynchronizationException("Interrupted while waiting for processor " + processor + " to stop", ie);
+ }
+ }
+
+ private void stopControllerService(final ControllerServiceNode controllerService, final VersionedControllerService proposed, final long timeout,
+ final FlowSynchronizationOptions.ComponentStopTimeoutAction timeoutAction, final Set<ComponentNode> referencesStopped,
+ final Set<ControllerServiceNode> servicesDisabled) throws FlowSynchronizationException,
+ TimeoutException, InterruptedException {
+ final ControllerServiceProvider serviceProvider = context.getControllerServiceProvider();
+ if (controllerService == null) {
+ return;
+ }
+
+ final Map<ComponentNode, Future<Void>> futures = serviceProvider.unscheduleReferencingComponents(controllerService);
+ referencesStopped.addAll(futures.keySet());
+
+ for (final Map.Entry<ComponentNode, Future<Void>> entry : futures.entrySet()) {
+ final ComponentNode component = entry.getKey();
+ final Future<Void> future = entry.getValue();
+
+ waitForStopCompletion(future, component, timeout, timeoutAction);
+ }
+
+ if (controllerService.isActive()) {
+ // If the Controller Service is active, we need to disable it. To do that, we must first disable all referencing services.
+ final List<ControllerServiceNode> referencingServices = controllerService.getReferences().findRecursiveReferences(ControllerServiceNode.class);
+
+ if (proposed != null && proposed.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) {
+ servicesDisabled.add(controllerService);
+ }
+
+ for (final ControllerServiceNode reference : referencingServices) {
+ if (reference.isActive()) {
+ servicesDisabled.add(reference);
+ }
+ }
+
+ // We want to stop all dependent services plus the controller service we are synchronizing.
+ final Set<ControllerServiceNode> servicesToStop = new HashSet<>(servicesDisabled);
+ servicesToStop.add(controllerService);
+
+ // Disable the service and wait for completion, up to the timeout allowed
+ final Future<Void> future = serviceProvider.disableControllerServicesAsync(servicesToStop);
+ waitForStopCompletion(future, controllerService, timeout, timeoutAction);
+ }
+ }
+
+
private void updateProcessor(final ProcessorNode processor, final VersionedProcessor proposed) throws ProcessorInstantiationException {
LOG.debug("Updating Processor {}", processor);
@@ -1542,7 +2451,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
processor.setName(proposed.getName());
processor.setPenalizationPeriod(proposed.getPenaltyDuration());
- final Map<String, String> properties = populatePropertiesMap(processor, proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup());
+ final Map<String, String> properties = populatePropertiesMap(processor, proposed.getProperties(), processor.getProcessGroup());
processor.setProperties(properties, true);
processor.setRunDuration(proposed.getRunDurationMillis(), TimeUnit.MILLISECONDS);
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
@@ -1556,6 +2465,16 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
processor.setMaxBackoffPeriod(proposed.getMaxBackoffPeriod());
processor.setRetriedRelationships(proposed.getRetriedRelationships());
+ final Set<String> proposedAutoTerminated = proposed.getAutoTerminatedRelationships();
+ if (proposedAutoTerminated != null) {
+ final Set<Relationship> relationshipsToAutoTerminate = proposedAutoTerminated.stream()
+ .map(processor::getRelationship)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+
+ processor.setAutoTerminatedRelationships(relationshipsToAutoTerminate);
+ }
+
if (proposed.getRetryCount() != null) {
processor.setRetryCount(proposed.getRetryCount());
} else {
@@ -1580,7 +2499,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
}
-
private String getServiceInstanceId(final String serviceVersionedComponentId, final ProcessGroup group) {
for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) {
final String versionedId = serviceNode.getVersionedComponentId().orElse(
@@ -1596,7 +2514,110 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
return getServiceInstanceId(serviceVersionedComponentId, parent);
+ }
+
+ @Override
+ public void synchronize(final RemoteProcessGroup rpg, final VersionedRemoteProcessGroup proposed, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException {
+
+ if (rpg == null && proposed == null) {
+ return;
+ }
+
+ setSynchronizationOptions(synchronizationOptions);
+ final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+
+ synchronizationOptions.getComponentScheduler().pause();
+ try {
+ // Stop the processor, if necessary, in order to update it.
+ final Set<Connectable> toRestart = new HashSet<>();
+ if (rpg != null) {
+ if (rpg.isTransmitting()) {
+ final Set<RemoteGroupPort> transmitting = getTransmittingPorts(rpg);
+
+ final Future<?> future = rpg.stopTransmitting();
+ waitForStopCompletion(future, rpg, timeout, synchronizationOptions.getComponentStopTimeoutAction());
+
+ final boolean proposedTransmitting = isTransmitting(proposed);
+ if (proposed != null && proposedTransmitting) {
+ toRestart.addAll(transmitting);
+ }
+ }
+ }
+
+ try {
+ if (proposed == null) {
+ // Stop any downstream components so that we can delete the RPG
+ for (final RemoteGroupPort outPort : rpg.getOutputPorts()) {
+ final Set<Connectable> stoppedDownstream = stopDownstreamComponents(outPort, timeout, synchronizationOptions);
+ toRestart.addAll(stoppedDownstream);
+ }
+
+ // Verify that we can delete the components
+ for (final RemoteGroupPort port : rpg.getInputPorts()) {
+ verifyCanDelete(port, timeout);
+ }
+ for (final RemoteGroupPort port : rpg.getOutputPorts()) {
+ verifyCanDelete(port, timeout);
+ }
+
+ rpg.getProcessGroup().removeRemoteProcessGroup(rpg);
+ LOG.info("Successfully synchronized {} by removing it from the flow", rpg);
+ } else if (rpg == null) {
+ final RemoteProcessGroup added = addRemoteProcessGroup(group, proposed, synchronizationOptions.getComponentIdGenerator());
+ LOG.info("Successfully synchronized {} by adding it to the flow", added);
+ } else {
+ updateRemoteProcessGroup(rpg, proposed, synchronizationOptions.getComponentIdGenerator());
+ LOG.info("Successfully synchronized {} by updating it to match proposed version", rpg);
+ }
+ } catch (final Exception e) {
+ throw new FlowSynchronizationException("Failed to synchronize " + rpg + " with proposed version", e);
+ } finally {
+ // Restart any components that need to be restarted.
+ for (final Connectable stoppedComponent : toRestart) {
+ context.getComponentScheduler().startComponent(stoppedComponent);
+ }
+ }
+ } finally {
+ synchronizationOptions.getComponentScheduler().resume();
+ }
+ }
+
+ private boolean isTransmitting(final VersionedRemoteProcessGroup versionedRpg) {
+ if (versionedRpg == null) {
+ return false;
+ }
+
+ for (final VersionedRemoteGroupPort port : versionedRpg.getInputPorts()) {
+ if (port.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) {
+ return true;
+ }
+ }
+
+ for (final VersionedRemoteGroupPort port : versionedRpg.getOutputPorts()) {
+ if (port.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private Set<RemoteGroupPort> getTransmittingPorts(final RemoteProcessGroup rpg) {
+ if (rpg == null) {
+ return Collections.emptySet();
+ }
+
+ final Set<RemoteGroupPort> transmitting = new HashSet<>();
+ rpg.getInputPorts().stream()
+ .filter(port -> port.getScheduledState() == ScheduledState.RUNNING)
+ .forEach(transmitting::add);
+
+ rpg.getOutputPorts().stream()
+ .filter(port -> port.getScheduledState() == ScheduledState.RUNNING)
+ .forEach(transmitting::add);
+
+ return transmitting;
}
private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final ComponentIdGenerator componentIdGenerator) {
@@ -1607,6 +2628,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
destination.addRemoteProcessGroup(rpg);
updateRemoteProcessGroup(rpg, proposed, componentIdGenerator);
+ rpg.initialize();
return rpg;
}
@@ -1652,25 +2674,33 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
private RemoteGroupPort getRpgInputPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator) {
- return getRpgPort(port, rpg, componentIdGenerator, RemoteProcessGroup::getInputPort);
+ return getRpgPort(port, rpg, componentIdGenerator, rpg::getInputPort, rpg.getInputPorts());
}
private RemoteGroupPort getRpgOutputPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator) {
- return getRpgPort(port, rpg, componentIdGenerator, RemoteProcessGroup::getOutputPort);
+ return getRpgPort(port, rpg, componentIdGenerator, rpg::getOutputPort, rpg.getOutputPorts());
}
private RemoteGroupPort getRpgPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator,
- final BiFunction<RemoteProcessGroup, String, RemoteGroupPort> portLookup) {
+ final Function<String, RemoteGroupPort> portLookup, final Set<RemoteGroupPort> ports) {
final String instanceId = port.getInstanceIdentifier();
if (instanceId != null) {
- final RemoteGroupPort remoteGroupPort = portLookup.apply(rpg, instanceId);
+ final RemoteGroupPort remoteGroupPort = portLookup.apply(instanceId);
if (remoteGroupPort != null) {
return remoteGroupPort;
}
}
+ final Optional<RemoteGroupPort> portByName = ports.stream()
+ .filter(p -> p.getName().equals(port.getName()))
+ .findFirst();
+ if (portByName.isPresent()) {
+ return portByName.get();
+ }
+
+
final String portId = componentIdGenerator.generateUuid(port.getIdentifier(), port.getInstanceIdentifier(), rpg.getIdentifier());
- final RemoteGroupPort remoteGroupPort = portLookup.apply(rpg, portId);
+ final RemoteGroupPort remoteGroupPort = portLookup.apply(portId);
return remoteGroupPort;
}
@@ -1711,6 +2741,144 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
return descriptor;
}
+ private void verifyCanSynchronize(final Connection connection, final VersionedConnection proposedConnection) throws FlowSynchronizationException {
+ if (proposedConnection == null) {
+ verifyCanDeleteWhenQueueEmpty(connection);
+ }
+ }
+
+ private void verifyCanDeleteWhenQueueEmpty(final Connection connection) throws FlowSynchronizationException {
+ final boolean empty = connection.getFlowFileQueue().isEmpty();
+ if (empty) {
+ return;
+ }
+
+ final ScheduledState scheduledState = connection.getDestination().getScheduledState();
+ if (scheduledState == ScheduledState.DISABLED || scheduledState == ScheduledState.STOPPED || scheduledState == ScheduledState.STOPPING) {
+ throw new FlowSynchronizationException("Cannot synchronize " + connection + " with proposed connection because doing so would require deleting the connection, " +
+ "and the connection has data queued while the destination is not running. The connection must be emptied before it can be removed.");
+ }
+ }
+
+ private Set<Connectable> getUpstreamComponents(final Connection connection) {
+ if (connection == null) {
+ return Collections.emptySet();
+ }
+
+ final Set<Connectable> components = new HashSet<>();
+ findUpstreamComponents(connection, components);
+ return components;
+ }
+
+ private void findUpstreamComponents(final Connection connection, final Set<Connectable> components) {
+ final Connectable source = connection.getSource();
+ if (source.getConnectableType() == ConnectableType.FUNNEL) {
+ source.getIncomingConnections().forEach(incoming -> findUpstreamComponents(incoming, components));
+ } else {
+ components.add(source);
+ }
+ }
+
+ @Override
+ public void synchronize(final Connection connection, final VersionedConnection proposedConnection, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException {
+
+ if (connection == null && proposedConnection == null) {
+ return;
+ }
+
+ final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
+
+ // Stop any upstream components so that we can update the connection
+ final Set<Connectable> upstream = getUpstreamComponents(connection);
+ Set<Connectable> stoppedComponents;
+ try {
+ stoppedComponents = stopOrTerminate(upstream, timeout, synchronizationOptions);
+ } catch (final TimeoutException te) {
+ if (synchronizationOptions.getComponentStopTimeoutAction() == FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION) {
+ throw te;
+ }
+
+ LOG.info("Components upstream of {} did not stop in time. Will terminate {}", connection, upstream);
+ terminateComponents(upstream);
+ stoppedComponents = upstream;
+ }
+
+ try {
+ // Verify that we can synchronize the connection now that the sources are stopped.
+ verifyCanSynchronize(connection, proposedConnection);
+
+ // If the connection is to be deleted, wait for the queue to empty.
+ if (proposedConnection == null) {
+ try {
+ waitForQueueEmpty(connection, synchronizationOptions.getComponentStopTimeout());
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new FlowSynchronizationException("Interrupted while waiting for FlowFile queue to empty for " + connection, ie);
+ }
+ }
+
+ // Stop destination component so that we can update the connection
+ if (connection != null) {
+ final Connectable destination = connection.getDestination();
+ final boolean stopped = stopOrTerminate(destination, timeout, synchronizationOptions);
+ if (stopped) {
+ stoppedComponents.add(destination);
+ }
+ }
+
+ if (connection == null) {
+ final Connection added = addConnection(group, proposedConnection, synchronizationOptions.getComponentIdGenerator());
+ LOG.info("Successfully synchronized {} by adding it to the flow", added);
+ } else if (proposedConnection == null) {
+ connection.getProcessGroup().removeConnection(connection);
+ LOG.info("Successfully synchronized {} by removing it from the flow", connection);
+ } else {
+ updateConnection(connection, proposedConnection);
+ LOG.info("Successfully synchronized {} by updating it to match proposed version", connection);
+ }
+ } finally {
+ // If not removing the connection, restart any component that we stopped.
+ if (proposedConnection != null) {
+ for (final Connectable component : stoppedComponents) {
+ context.getComponentScheduler().startComponent(component);
+ }
+ }
+ }
+ }
+
+ private void waitForQueueEmpty(final Connection connection, final Duration duration) throws TimeoutException, InterruptedException {
+ if (connection == null) {
+ return;
+ }
+
+ final FlowFileQueue flowFileQueue = connection.getFlowFileQueue();
+ final long timeoutMillis = System.currentTimeMillis() + duration.toMillis();
+
+ while (!flowFileQueue.isEmpty()) {
+ if (System.currentTimeMillis() >= timeoutMillis) {
+ throw new TimeoutException("Timed out waiting for " + connection + " to empty its FlowFiles");
+ }
+
+ Thread.sleep(10L);
+ }
+ }
+
+ private void terminateComponents(final Set<Connectable> components) {
+ for (final Connectable component : components) {
+ if (!(component instanceof ProcessorNode)) {
+ continue;
+ }
+
+ final ProcessorNode processor = (ProcessorNode) component;
+ if (!processor.isRunning()) {
+ continue;
+ }
+
+ processor.getProcessGroup().stopProcessor(processor);
+ processor.terminate();
+ }
+ }
private void updateConnection(final Connection connection, final VersionedConnection proposed) {
LOG.debug("Updating connection from {} to {} with name {} and relationships {}: {}",
@@ -1922,6 +3090,86 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
return null;
}
+ @Override
+ public void synchronize(final ReportingTaskNode reportingTask, final VersionedReportingTask proposed, final FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException {
+
+ if (reportingTask == null && proposed == null) {
+ return;
+ }
+
+ synchronizationOptions.getComponentScheduler().pause();
+ try {
+ // If reporting task is not null, make sure that it's stopped.
+ if (reportingTask != null && reportingTask.isRunning()) {
+ reportingTask.stop();
+ }
+
+ if (proposed == null) {
+ reportingTask.verifyCanDelete();
+ context.getFlowManager().removeReportingTask(reportingTask);
+ LOG.info("Successfully synchronized {} by removing it from the flow", reportingTask);
+ } else if (reportingTask == null) {
+ final ReportingTaskNode added = addReportingTask(proposed);
+ LOG.info("Successfully synchronized {} by adding it to the flow", added);
+ } else {
+ updateReportingTask(reportingTask, proposed);
+ LOG.info("Successfully synchronized {} by updating it to match proposed version", reportingTask);
+ }
+ } finally {
+ synchronizationOptions.getComponentScheduler().resume();
+ }
+ }
+
+ private ReportingTaskNode addReportingTask(final VersionedReportingTask reportingTask) {
+ final BundleCoordinate coordinate = toCoordinate(reportingTask.getBundle());
+ final ReportingTaskNode taskNode = context.getFlowManager().createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false);
+ updateReportingTask(taskNode, reportingTask);
+ return taskNode;
+ }
+
+ private void updateReportingTask(final ReportingTaskNode reportingTask, final VersionedReportingTask proposed) {
+ LOG.debug("Updating Reporting Task {}", reportingTask);
+
+ reportingTask.pauseValidationTrigger();
+ try {
+ reportingTask.setName(proposed.getName());
+ reportingTask.setComments(proposed.getComments());
+ reportingTask.setSchedulingPeriod(proposed.getSchedulingPeriod());
+ reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
+
+ reportingTask.setAnnotationData(proposed.getAnnotationData());
+ reportingTask.setProperties(proposed.getProperties());
+
+ // enable/disable/start according to the ScheduledState
+ switch (proposed.getScheduledState()) {
+ case DISABLED:
+ if (reportingTask.isRunning()) {
+ reportingTask.stop();
+ }
+ reportingTask.disable();
+ break;
+ case ENABLED:
+ if (reportingTask.getScheduledState() == org.apache.nifi.controller.ScheduledState.DISABLED) {
+ reportingTask.enable();
+ } else if (reportingTask.isRunning()) {
+ reportingTask.stop();
+ }
+ break;
+ case RUNNING:
+ if (reportingTask.getScheduledState() == org.apache.nifi.controller.ScheduledState.DISABLED) {
+ reportingTask.enable();
+ }
+ if (!reportingTask.isRunning()) {
+ reportingTask.start();
+ }
+ break;
+ }
+ } finally {
+ reportingTask.resumeValidationTrigger();
+ }
+ }
+
private <T extends org.apache.nifi.components.VersionedComponent & Connectable> boolean matchesId(final T component, final String id) {
return id.equals(component.getIdentifier()) || id.equals(component.getVersionedComponentId().orElse(NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())));
}
@@ -1931,7 +3179,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
NiFiRegistryFlowMapper.generateVersionedComponentId(group.getIdentifier())).equals(groupId);
}
-
private void findAllProcessors(final VersionedProcessGroup group, final Map<String, VersionedProcessor> map) {
for (final VersionedProcessor processor : group.getProcessors()) {
map.put(processor.getIdentifier(), processor);
@@ -2026,7 +3273,6 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
}
-
private Set<String> getKnownVariableNames(final ProcessGroup group) {
final Set<String> variableNames = new HashSet<>();
populateKnownVariableNames(group, variableNames);
@@ -2059,4 +3305,5 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
return getVersionedControllerService(group.getParent(), versionedComponentId);
}
+
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedComponentSynchronizer.java
new file mode 100644
index 0000000000..04b2491cd2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedComponentSynchronizer.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.flow.synchronization;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedLabel;
+import org.apache.nifi.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.groups.FlowSynchronizationOptions;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.parameter.ParameterContext;
+
+import java.util.concurrent.TimeoutException;
+
+public interface VersionedComponentSynchronizer {
+
+ /**
+ * Synchronize the given Process Group to match the proposed flow
+ * @param group the Process Group to update
+ * @param proposedFlow the proposed/desired state for the process group
+ * @param synchronizationOptions options for how to synchronize the group
+ */
+ void synchronize(ProcessGroup group, VersionedExternalFlow proposedFlow, FlowSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException;
+
+ /**
+ * Verifies that the given Process Group can be updated to match the proposed flow
+ * @param group the group to update
+ * @param proposed the proposed updated version
+ * @param verifyConnectionRemoval if <code>true</code> and synchronizing the Process Group would result in any Connection being removed, an IllegalStateException will be thrown if that
+ * Connection has data in it. If <code>false</code>, the presence of data in removed queues will be ignored.
+ */
+ void verifyCanSynchronize(ProcessGroup group, VersionedProcessGroup proposed, boolean verifyConnectionRemoval);
+
+
+ /**
+ * Synchronizes the given Processor to match the proposed snapshot, or deletes the Processor if the proposed snapshot is <code>null</code>. If the given processor is <code>null</code>,
+ * adds the processor to the given ProcessGroup
+ *
+ * @param processor the processor to synchronize
+ * @param proposedProcessor the proposed/desired state for the processor
+ * @param group the ProcessGroup to which the ProcessorNode should belong.
+ * @param synchronizationOptions options for how to synchronize the flow
+ *
+ * @throws FlowSynchronizationException if unable to synchronize the processor with the proposed version
+ * @throws TimeoutException if the processor must be stopped in order to synchronize it with the proposed version and stopping takes longer than the timeout allowed by the
+ * {@link FlowSynchronizationOptions synchronization options}.
+ * @throws InterruptedException if interrupted while waiting for processor to stop or outbound connections to empty if processor is being removed
+ */
+ void synchronize(ProcessorNode processor, VersionedProcessor proposedProcessor, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException;
+
+ /**
+ * Synchronizes the given Controller Service to match the proposed version, or deletes the Controller Service if the proposed snapshot is <code>null</code>. If the given Controller Service is
+ * <code>null</code>, adds the Controller Service to the given Process Group
+ *
+ * @param controllerService the Controller Service to synchronize
+ * @param proposed the proposed/desired state for the controller service
+ * @param group the ProcessGroup to which the Controller Service should belong
+ * @param synchronizationOptions options for how to synchronize the flow
+ *
+ * @throws FlowSynchronizationException if unable to synchronize the Controller Service with the proposed version
+ * @throws TimeoutException if the Controller Service must be disabled in order to synchronize it with the proposed version and disabling takes longer than the timeout allowed by the
+ * {@link FlowSynchronizationOptions synchronization options}.
+ * @throws InterruptedException if interrupted while waiting for Controller Service to disable
+ */
+ void synchronize(ControllerServiceNode controllerService, VersionedControllerService proposed, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException;
+
+ /**
+ * Synchronizes the given Connection to match the proposed one, or deletes the Connection if the proposed is <code>null</code>. If the given connection is <code>null</code>, adds the connection
+ * to the given ProcessGroup
+ * @param connection the connection to synchronize
+ * @param proposedConnection the proposed/desired state for the connection
+ * @param group the ProcessGroup to which the connection should belong
+ * @param synchronizationOptions options for how to synchronize the flow
+ *
+ * @throws IllegalStateException if the connection cannot be updated due to the state of the flow
+ * @throws FlowSynchronizationException if unable to synchronize the connection with the proposed version
+ * @throws TimeoutException if the source or destination of the connection must be stopped in order to perform the synchronization and stopping it takes longer than the timeout allowed by the
+ * {@link FlowSynchronizationOptions synchronization options}.
+ */
+ void synchronize(Connection connection, VersionedConnection proposedConnection, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException;
+
+
+ /**
+ * Synchronizes the given Port to match the proposed one, or deletes the Port if the proposed is <code>null</code>. If the given Port is <code>null</code>, creates it and adds it to the given
+ * ProcessGroup
+ *
+ * @param port the port to synchronize
+ * @param proposed the proposed/desired state for the port
+ * @param group the ProcessGroup to which the port should belong
+ * @param synchronizationOptions options for how to synchronize the flow
+ *
+ * @throws IllegalStateException if the port cannot be updated due to the state of the flow
+ * @throws FlowSynchronizationException if unable to synchronize the port with the proposed version
+ * @throws TimeoutException if the port is running and takes longer to stop than the timeout allowed by the {@link FlowSynchronizationOptions synchronization options}.
+ */
+ void synchronize(Port port, VersionedPort proposed, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions) throws FlowSynchronizationException, TimeoutException,
+ InterruptedException;
+
+
+ /**
+ * Synchronizes the given Funnel to match the proposed one, or deletes the Funnel if the proposed is <code>null</code>. If the given Funnel is <code>null</code>, creates it and adds it to the
+ * given ProcessGroup
+ *
+ * @param funnel the funnel to synchronize
+ * @param proposed the proposed/desired state for the funnel
+ * @param group the ProcessGroup to which the funnel should belong
+ * @param synchronizationOptions options for how to synchronize the flow
+ *
+ * @throws IllegalStateException if the funnel cannot be updated due to the state of the flow
+ * @throws FlowSynchronizationException if unable to synchronize the funnel with the proposed version
+ * @throws TimeoutException if the funnel is being removed and downstream components take longer to stop than the timeout allowed by the {@link FlowSynchronizationOptions synchronization options}.
+ */
+ void synchronize(Funnel funnel, VersionedFunnel proposed, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions) throws FlowSynchronizationException, TimeoutException,
+ InterruptedException;
+
+ /**
+ * Synchronizes the given Label to match the proposed one, or deletes the Label if the proposed is <code>null</code>. If the given Label is <code>null</code>, creates it and adds it to the
+ * given ProcessGroup
+ *
+ * @param label the label to synchronize
+ * @param proposed the proposed/desired state for the label
+ * @param group the ProcessGroup to which the label should belong
+ * @param synchronizationOptions options for how to synchronize the flow
+ */
+ void synchronize(Label label, VersionedLabel proposed, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions);
+
+ /**
+ * Synchronizes the given Reporting Task to match the proposed one, or deletes the Reporting Task if the proposed is <code>null</code>. If the given Reporting Task is <code>null</code>,
+ * creates it
+ *
+ * @param reportingTask the reporting task to synchronize
+ * @param proposed the proposed/desired state
+ * @param synchronizationOptions options for how to synchronize the flow
+ *
+ * @throws IllegalStateException if the reporting task cannot be updated due to the state of the flow
+ * @throws FlowSynchronizationException if unable to synchronize the reporting task with the proposed version
+ * @throws TimeoutException if the reporting task is being removed and takes longer to stop than the timeout allowed by the {@link FlowSynchronizationOptions synchronization options}.
+ */
+ void synchronize(ReportingTaskNode reportingTask, VersionedReportingTask proposed, FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException;
+
+
+ /**
+ * Synchronizes the given Remote Process Group to match the proposed one, or deletes the rpg if the proposed is <code>null</code>. If the given rpg is <code>null</code>, creates it and adds
+ * it to the given ProcessGroup
+ *
+ * @param rpg the rpg to synchronize
+ * @param proposed the proposed/desired state for the rpg
+ * @param group the ProcessGroup to which the rpg should belong
+ * @param synchronizationOptions options for how to synchronize the flow
+ *
+ * @throws IllegalStateException if the rpg cannot be updated due to the state of the flow
+ * @throws FlowSynchronizationException if unable to synchronize the rpg with the proposed version
+ * @throws TimeoutException if the rpg is being removed and takes longer to stop than the timeout allowed by the {@link FlowSynchronizationOptions synchronization options}.
+ */
+ void synchronize(RemoteProcessGroup rpg, VersionedRemoteProcessGroup proposed, ProcessGroup group, FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException;
+
+
+ void synchronize(ParameterContext parameterContext, VersionedParameterContext proposed, FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException;
+
+ void synchronizeProcessGroupSettings(ProcessGroup processGroup, VersionedProcessGroup proposed, ProcessGroup parentGroup, FlowSynchronizationOptions synchronizationOptions)
+ throws FlowSynchronizationException, TimeoutException, InterruptedException, ProcessorInstantiationException;
+
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedFlowSynchronizationContext.java
similarity index 93%
rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizationContext.java
rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedFlowSynchronizationContext.java
index 4e18031bfd..ebb9ec3903 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedFlowSynchronizationContext.java
@@ -15,12 +15,14 @@
* limitations under the License.
*/
-package org.apache.nifi.groups;
+package org.apache.nifi.flow.synchronization;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.groups.ComponentIdGenerator;
+import org.apache.nifi.groups.ComponentScheduler;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.registry.flow.FlowRegistryClient;
@@ -30,7 +32,7 @@ import java.util.function.Function;
import static java.util.Objects.requireNonNull;
-public class ProcessGroupSynchronizationContext {
+public class VersionedFlowSynchronizationContext {
private final ComponentIdGenerator componentIdGenerator;
private final FlowManager flowManager;
private final FlowRegistryClient flowRegistryClient;
@@ -42,7 +44,7 @@ public class ProcessGroupSynchronizationContext {
private final Function<ProcessorNode, ProcessContext> processContextFactory;
- private ProcessGroupSynchronizationContext(final Builder builder) {
+ private VersionedFlowSynchronizationContext(final Builder builder) {
this.componentIdGenerator = builder.componentIdGenerator;
this.flowManager = builder.flowManager;
this.flowRegistryClient = builder.flowRegistryClient;
@@ -146,7 +148,7 @@ public class ProcessGroupSynchronizationContext {
return this;
}
- public ProcessGroupSynchronizationContext build() {
+ public VersionedFlowSynchronizationContext build() {
requireNonNull(componentIdGenerator, "Component ID Generator must be set");
requireNonNull(flowManager, "Flow Manager must be set");
requireNonNull(flowRegistryClient, "Flow Registry Client must be set");
@@ -156,7 +158,7 @@ public class ProcessGroupSynchronizationContext {
requireNonNull(componentScheduler, "Component Scheduler must be set");
requireNonNull(flowMappingOptions, "Flow Mapping Options must be set");
requireNonNull(processContextFactory, "Process Context Factory must be set");
- return new ProcessGroupSynchronizationContext(this);
+ return new VersionedFlowSynchronizationContext(this);
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java
index a54ec61ab6..32181b7413 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/DefaultComponentScheduler.java
@@ -23,6 +23,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
+import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.remote.RemoteGroupPort;
import java.util.Collection;
@@ -63,4 +64,8 @@ public class DefaultComponentScheduler extends AbstractComponentScheduler {
protected void enableNow(final Collection<ControllerServiceNode> controllerServices) {
getControllerServiceProvider().enableControllerServices(controllerServices);
}
+
+ protected void startNow(final ReportingTaskNode reportingTask) {
+ reportingTask.start();
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java
deleted file mode 100644
index 25cea8802b..0000000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.groups;
-
-import org.apache.nifi.controller.exception.ProcessorInstantiationException;
-import org.apache.nifi.flow.VersionedExternalFlow;
-import org.apache.nifi.flow.VersionedProcessGroup;
-
-public interface ProcessGroupSynchronizer {
-
- /**
- * Synchronize the given Process Group to match the proposed snaphsot
- * @param group the Process Group to update
- * @param proposedFlow the proposed/desired state for the process group
- * @param synchronizationOptions options for how to synchronize the group
- */
- void synchronize(ProcessGroup group, VersionedExternalFlow proposedFlow, GroupSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException;
-
- void verifyCanSynchronize(ProcessGroup group, VersionedProcessGroup proposed, boolean verifyConnectionRemoval);
-
-}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
index b6ed741889..bafb091d47 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
@@ -18,6 +18,7 @@
package org.apache.nifi.groups;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Triggerable;
import org.apache.nifi.controller.service.ControllerServiceNode;
@@ -100,6 +101,11 @@ public class RetainExistingStateComponentScheduler implements ComponentScheduler
delegate.disableControllerServicesAsync(controllerServices);
}
+ @Override
+ public void startReportingTask(final ReportingTaskNode reportingTask) {
+ delegate.startReportingTask(reportingTask);
+ }
+
@Override
public void pause() {
delegate.pause();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 1924f9bcfb..8960cefad4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -66,6 +66,8 @@ import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer;
+import org.apache.nifi.flow.synchronization.VersionedFlowSynchronizationContext;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
@@ -3787,7 +3789,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final ComponentScheduler defaultComponentScheduler = new DefaultComponentScheduler(controllerServiceProvider, stateLookup);
final ComponentScheduler retainExistingStateScheduler = new RetainExistingStateComponentScheduler(this, defaultComponentScheduler);
- final GroupSynchronizationOptions synchronizationOptions = new GroupSynchronizationOptions.Builder()
+ final FlowSynchronizationOptions synchronizationOptions = new FlowSynchronizationOptions.Builder()
.componentIdGenerator(idGenerator)
.componentScheduler(retainExistingStateScheduler)
.ignoreLocalModifications(!verifyNotDirty)
@@ -3818,14 +3820,14 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
- public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
+ public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final FlowSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
writeLock.lock();
try {
verifyCanUpdate(proposedSnapshot, true, !synchronizationOptions.isIgnoreLocalModifications());
- final ProcessGroupSynchronizationContext groupSynchronizationContext = createGroupSynchronizationContext(
+ final VersionedFlowSynchronizationContext groupSynchronizationContext = createGroupSynchronizationContext(
synchronizationOptions.getComponentIdGenerator(), synchronizationOptions.getComponentScheduler(), flowMappingOptions);
- final StandardProcessGroupSynchronizer synchronizer = new StandardProcessGroupSynchronizer(groupSynchronizationContext);
+ final StandardVersionedComponentSynchronizer synchronizer = new StandardVersionedComponentSynchronizer(groupSynchronizationContext);
final StandardVersionControlInformation originalVci = this.versionControlInfo.get();
try {
@@ -3963,9 +3965,9 @@ public final class StandardProcessGroup implements ProcessGroup {
}
final ComponentIdGenerator componentIdGenerator = (proposedId, instanceId, destinationGroupId) -> proposedId;
- final ProcessGroupSynchronizationContext groupSynchronizationContext = createGroupSynchronizationContext(
+ final VersionedFlowSynchronizationContext groupSynchronizationContext = createGroupSynchronizationContext(
componentIdGenerator, ComponentScheduler.NOP_SCHEDULER, FlowMappingOptions.DEFAULT_OPTIONS);
- final StandardProcessGroupSynchronizer synchronizer = new StandardProcessGroupSynchronizer(groupSynchronizationContext);
+ final StandardVersionedComponentSynchronizer synchronizer = new StandardVersionedComponentSynchronizer(groupSynchronizationContext);
synchronizer.verifyCanSynchronize(this, updatedFlow.getFlowContents(), verifyConnectionRemoval);
} finally {
@@ -3973,9 +3975,9 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
- private ProcessGroupSynchronizationContext createGroupSynchronizationContext(final ComponentIdGenerator componentIdGenerator, final ComponentScheduler componentScheduler,
- final FlowMappingOptions flowMappingOptions) {
- return new ProcessGroupSynchronizationContext.Builder()
+ private VersionedFlowSynchronizationContext createGroupSynchronizationContext(final ComponentIdGenerator componentIdGenerator, final ComponentScheduler componentScheduler,
+ final FlowMappingOptions flowMappingOptions) {
+ return new VersionedFlowSynchronizationContext.Builder()
.componentIdGenerator(componentIdGenerator)
.flowManager(flowManager)
.flowRegistryClient(flowRegistryClient)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 3c0954d2e2..1eb226701b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -497,7 +497,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
if (sendPort == null) {
sendPort = addInputPort(descriptor);
logger.info("Added Input Port {} with Name {} and Target Identifier {} to {}", sendPort.getIdentifier(), sendPort.getName(), sendPort.getTargetIdentifier(), this);
- } else {
+ } else if (descriptor.getTargetId() != null) {
final String previousTargetId = sendPort.getTargetIdentifier();
sendPort.setTargetIdentifier(descriptor.getTargetId());
logger.info("Updated Target identifier for Input Port with Name {} from {} to {} for {}", descriptor.getName(), previousTargetId, descriptor.getTargetId(), this);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java
new file mode 100644
index 0000000000..235cb4df82
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java
@@ -0,0 +1,1025 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.flow.synchronization;
+
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReloadComponent;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceReference;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.ConnectableComponentType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.groups.ComponentIdGenerator;
+import org.apache.nifi.groups.ComponentScheduler;
+import org.apache.nifi.groups.FlowSynchronizationOptions;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.parameter.Parameter;
+import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.parameter.ParameterDescriptor;
+import org.apache.nifi.parameter.ParameterReferenceManager;
+import org.apache.nifi.parameter.StandardParameterContext;
+import org.apache.nifi.parameter.StandardParameterContextManager;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
+import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class StandardVersionedComponentSynchronizerTest {
+
+ private ProcessorNode processorA;
+ private ProcessorNode processorB;
+ private Connection connectionAB;
+ private Port inputPort;
+ private Port outputPort;
+ private StandardVersionedComponentSynchronizer synchronizer;
+ private FlowSynchronizationOptions synchronizationOptions;
+ private ProcessGroup group;
+ private ComponentScheduler componentScheduler;
+ private ComponentIdGenerator componentIdGenerator;
+ private ControllerServiceProvider controllerServiceProvider;
+ private ParameterContextManager parameterContextManager;
+ private ParameterReferenceManager parameterReferenceManager;
+
+ private final Set<String> queuesWithData = Collections.synchronizedSet(new HashSet<>());
+ private final Bundle bundle = new Bundle("group", "artifact", "version 1.0");
+
+ @Before
+ public void setup() {
+ final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ controllerServiceProvider = Mockito.mock(ControllerServiceProvider.class);
+ final Function<ProcessorNode, ProcessContext> processContextFactory = proc -> Mockito.mock(ProcessContext.class);
+ final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
+ final FlowRegistryClient flowRegistryClient = Mockito.mock(FlowRegistryClient.class);
+ componentIdGenerator = (proposed, instance, group) -> proposed == null ? instance : proposed;
+ componentScheduler = Mockito.mock(ComponentScheduler.class);
+ parameterContextManager = new StandardParameterContextManager();
+ parameterReferenceManager = Mockito.mock(ParameterReferenceManager.class);
+
+ when(flowManager.createControllerService(anyString(), anyString(), any(BundleCoordinate.class), anySet(), anyBoolean(), anyBoolean(), nullable(String.class)))
+ .thenReturn(Mockito.mock(ControllerServiceNode.class));
+ when(flowManager.getParameterContextManager()).thenReturn(parameterContextManager);
+ doAnswer(invocation -> {
+ invocation.getArgument(0, Runnable.class).run();
+ return null;
+ }).when(flowManager).withParameterContextResolution(any(Runnable.class));
+ doAnswer(invocation -> {
+ final String id = invocation.getArgument(0, String.class);
+ final String name = invocation.getArgument(1, String.class);
+ final ParameterContext parameterContext = new StandardParameterContext(id, name, parameterReferenceManager, null);
+
+ final Map<String, Parameter> parameterMap = invocation.getArgument(2, Map.class);
+ parameterContext.setParameters(parameterMap);
+
+ final List<String> inheritedContextIds = invocation.getArgument(3, List.class);
+ final List<ParameterContext> inheritedContexts = inheritedContextIds.stream()
+ .map(parameterContextManager::getParameterContext)
+ .collect(Collectors.toList());
+ parameterContext.setInheritedParameterContexts(inheritedContexts);
+
+ parameterContextManager.addParameterContext(parameterContext);
+
+ return parameterContext;
+ }).when(flowManager).createParameterContext(anyString(), anyString(), anyMap(), anyList());
+
+ final VersionedFlowSynchronizationContext context = new VersionedFlowSynchronizationContext.Builder()
+ .componentIdGenerator(componentIdGenerator)
+ .componentScheduler(componentScheduler)
+ .extensionManager(extensionManager)
+ .flowManager(flowManager)
+ .controllerServiceProvider(controllerServiceProvider)
+ .flowMappingOptions(FlowMappingOptions.DEFAULT_OPTIONS)
+ .processContextFactory(processContextFactory)
+ .reloadComponent(reloadComponent)
+ .flowRegistryClient(flowRegistryClient)
+ .build();
+
+ group = Mockito.mock(ProcessGroup.class);
+
+ processorA = createMockProcessor();
+ processorB = createMockProcessor();
+ inputPort = createMockPort(ConnectableType.INPUT_PORT);
+ outputPort = createMockPort(ConnectableType.OUTPUT_PORT);
+ connectionAB = createMockConnection(processorA, processorB, group);
+
+ when(group.getProcessors()).thenReturn(Arrays.asList(processorA, processorB));
+ when(group.getInputPorts()).thenReturn(Collections.singleton(inputPort));
+ when(group.getOutputPorts()).thenReturn(Collections.singleton(outputPort));
+
+ synchronizationOptions = new FlowSynchronizationOptions.Builder()
+ .componentIdGenerator(componentIdGenerator)
+ .componentScheduler(componentScheduler)
+ .build();
+
+ synchronizer = new StandardVersionedComponentSynchronizer(context);
+
+ queuesWithData.clear();
+ }
+
+ private FlowSynchronizationOptions createQuickFailSynchronizationOptions(final FlowSynchronizationOptions.ComponentStopTimeoutAction timeoutAction) {
+ return new FlowSynchronizationOptions.Builder()
+ .componentIdGenerator(componentIdGenerator)
+ .componentScheduler(componentScheduler)
+ .componentStopTimeout(Duration.ofMillis(10))
+ .componentStopTimeoutAction(timeoutAction)
+ .build();
+ }
+
+ private ProcessorNode createMockProcessor() {
+ final String uuid = UUID.randomUUID().toString();
+
+ final ProcessorNode processor = Mockito.mock(ProcessorNode.class);
+ instrumentComponentNodeMethods(uuid, processor);
+ when(processor.isRunning()).thenReturn(false);
+ when(processor.getProcessGroup()).thenReturn(group);
+ when(processor.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+ when(processor.getScheduledState()).thenReturn(org.apache.nifi.controller.ScheduledState.STOPPED);
+
+ return processor;
+ }
+
+ private ControllerServiceNode createMockControllerService() {
+ final String uuid = UUID.randomUUID().toString();
+
+ final ControllerServiceNode service = Mockito.mock(ControllerServiceNode.class);
+ instrumentComponentNodeMethods(uuid, service);
+
+ when(service.isActive()).thenReturn(false);
+ when(service.getProcessGroup()).thenReturn(group);
+ when(service.getState()).thenReturn(ControllerServiceState.DISABLED);
+
+ return service;
+ }
+
+ private void instrumentComponentNodeMethods(final String uuid, final ComponentNode component) {
+ when(component.getIdentifier()).thenReturn(uuid);
+ when(component.getProperties()).thenReturn(Collections.emptyMap());
+ when(component.getPropertyDescriptor(anyString())).thenAnswer(invocation -> {
+ return new PropertyDescriptor.Builder()
+ .name(invocation.getArgument(0, String.class))
+ .build();
+ });
+ when(component.getBundleCoordinate()).thenReturn(new BundleCoordinate("group", "artifact", "version 1.0"));
+ }
+
+ private Port createMockPort(final ConnectableType connectableType) {
+ final String uuid = UUID.randomUUID().toString();
+
+ final Port port = Mockito.mock(Port.class);
+ when(port.getIdentifier()).thenReturn(uuid);
+ when(port.isRunning()).thenReturn(false);
+ when(port.getProcessGroup()).thenReturn(group);
+ when(port.getConnectableType()).thenReturn(connectableType);
+
+ return port;
+ }
+
+
+ private Connection createMockConnection(final Connectable source, final Connectable destination, final ProcessGroup group) {
+ final String uuid = UUID.randomUUID().toString();
+
+ final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ when(flowFileQueue.getIdentifier()).thenReturn(uuid);
+ when(flowFileQueue.isEmpty()).thenAnswer(invocation -> !queuesWithData.contains(uuid));
+
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn(uuid);
+ when(connection.getSource()).thenReturn(source);
+ when(connection.getDestination()).thenReturn(destination);
+ when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
+ when(connection.getProcessGroup()).thenReturn(group);
+
+ // Update the source's connections
+ final Set<Connection> outgoing = source.getConnections();
+ final Set<Connection> updatedOutgoing = outgoing == null ? new HashSet<>() : new HashSet<>(outgoing);
+ updatedOutgoing.add(connection);
+ when(source.getConnections()).thenReturn(updatedOutgoing);
+
+ // Update the destination's incoming connections
+ final List<Connection> incoming = destination.getIncomingConnections();
+ final List<Connection> updatedIncoming = incoming == null ? new ArrayList<>() : new ArrayList<>(incoming);
+ updatedIncoming.add(connection);
+ when(destination.getIncomingConnections()).thenReturn(updatedIncoming);
+
+ // Update group to return the connection
+ final Set<Connection> currentConnections = group.getConnections();
+ final Set<Connection> updatedConnections = currentConnections == null ? new HashSet<>() : new HashSet<>(currentConnections);
+ updatedConnections.add(connection);
+ when(group.getConnections()).thenReturn(updatedConnections);
+
+ return connection;
+ }
+
+ @Test
+ public void testSynchronizeStoppedProcessor() throws FlowSynchronizationException, TimeoutException, InterruptedException {
+ final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
+ synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions);
+
+ // Ensure that the processor was updated as expected.
+ verify(processorA).setProperties(versionedProcessor.getProperties(), true);
+ verify(processorA).setName(versionedProcessor.getName());
+ verify(componentScheduler, times(0)).startComponent(any(Connectable.class));
+ }
+
+ @Test
+ public void testSynchronizationStartsProcessor() throws FlowSynchronizationException, TimeoutException, InterruptedException {
+ final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
+ versionedProcessor.setScheduledState(ScheduledState.RUNNING);
+
+ synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions);
+ verify(componentScheduler, times(1)).transitionComponentState(any(Connectable.class), eq(ScheduledState.RUNNING));
+ }
+
+ @Test
+ public void testRunningProcessorRestarted() throws FlowSynchronizationException, TimeoutException, InterruptedException {
+ final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
+ versionedProcessor.setScheduledState(ScheduledState.RUNNING);
+
+ when(processorA.isRunning()).thenReturn(true);
+ when(group.stopProcessor(processorA)).thenReturn(CompletableFuture.completedFuture(null));
+
+ synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions);
+
+ verify(group, times(1)).stopProcessor(processorA);
+ verify(processorA).setProperties(versionedProcessor.getProperties(), true);
+ verify(componentScheduler, atLeast(1)).startComponent(any(Connectable.class));
+ }
+
+ @Test
+ public void testTimeoutWhenProcessorDoesNotStop() {
+ final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
+ versionedProcessor.setScheduledState(ScheduledState.RUNNING);
+ startProcessor(processorA, false);
+
+ synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
+
+ assertThrows(TimeoutException.class, () -> {
+ synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions);
+ });
+
+ verifyStopped(processorA);
+ verifyNotRestarted(processorA);
+ verify(processorA, times(0)).terminate();
+ verify(processorA, times(0)).setProperties(versionedProcessor.getProperties());
+ verify(processorA, times(0)).setName(versionedProcessor.getName());
+ }
+
+ @Test
+ public void testTerminateWhenProcessorDoesNotStop() throws FlowSynchronizationException, TimeoutException, InterruptedException {
+ final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
+ versionedProcessor.setScheduledState(ScheduledState.RUNNING);
+ startProcessor(processorA, false);
+
+ synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.TERMINATE);
+ synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions);
+
+ verifyStopped(processorA);
+ verifyRestarted(processorA);
+ verify(processorA, times(1)).terminate();
+ verify(processorA, times(1)).setProperties(versionedProcessor.getProperties(), true);
+ verify(processorA, times(1)).setName(versionedProcessor.getName());
+ }
+
+ @Test
+ public void testUpdateConnectionWithSourceDestStopped() throws FlowSynchronizationException, TimeoutException {
+ final VersionedConnection versionedConnection = createMinimalVersionedConnection(processorA, processorB);
+ versionedConnection.setName("Hello");
+
+ synchronizer.synchronize(connectionAB, versionedConnection, group, synchronizationOptions);
+
+ verify(connectionAB, times(1)).setName("Hello");
+ verify(connectionAB, times(1)).setRelationships(Collections.singleton(new Relationship.Builder().name("success").build()));
+ }
+
+ @Test
+ public void testUpdateConnectionStopsSource() throws FlowSynchronizationException, TimeoutException {
+ startProcessor(processorA);
+
+ final VersionedConnection versionedConnection = createMinimalVersionedConnection(processorA, processorB);
+ versionedConnection.setName("Hello");
+
+ synchronizer.synchronize(connectionAB, versionedConnection, group, synchronizationOptions);
+
+ // Ensure that the update occurred
+ verify(connectionAB, times(1)).setName("Hello");
+
+ // Ensure that the source was stopped and restarted
+ verifyStopped(processorA);
+ verifyRestarted(processorA);
+ }
+
+ @Test
+ public void testSourceTerminatedIfNotStopped() throws FlowSynchronizationException, TimeoutException {
+ startProcessor(processorA, false);
+
+ final VersionedConnection versionedConnection = createMinimalVersionedConnection(processorA, processorB);
+ versionedConnection.setName("Hello");
+
+ synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.TERMINATE);
+ synchronizer.synchronize(connectionAB, versionedConnection, group, synchronizationOptions);
+
+ // Ensure that we terminate the source
+ verify(processorA, times(1)).terminate();
+
+ // Ensure that the update occurred
+ verify(connectionAB, times(1)).setName("Hello");
+
+ // Ensure that the source was stopped and restarted
+ verifyStopped(processorA);
+ verifyRestarted(processorA);
+ }
+
+ @Test
+ public void testTimeoutStoppingSource() {
+ startProcessor(processorA, false);
+
+ final VersionedConnection versionedConnection = createMinimalVersionedConnection(processorA, processorB);
+ versionedConnection.setName("Hello");
+
+ synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
+
+ assertThrows(TimeoutException.class, () -> {
+ synchronizer.synchronize(connectionAB, versionedConnection, group, synchronizationOptions);
+ });
+
+ // Ensure that we terminate the source
+ verify(processorA, times(0)).terminate();
+
+ // Ensure that the update occurred
+ verify(connectionAB, times(0)).setName("Hello");
+
+ // Ensure that the source was stopped and restarted
+ verifyStopped(processorA);
+ verifyNotRestarted(processorA);
+ }
+
+ @Test
+ public void testConnectionRemoval() throws FlowSynchronizationException, TimeoutException {
+ startProcessor(processorA);
+
+ synchronizer.synchronize(connectionAB, null, group, synchronizationOptions);
+
+ // Ensure that the source was stopped and restarted
+ verifyStopped(processorA);
+ verifyNotRestarted(processorA);
+ verify(group).removeConnection(connectionAB);
+ }
+
+ @Test
+ public void testFailsIfDestinationStoppedQueueNotEmpty() {
+ startProcessor(processorA);
+ queuesWithData.add(connectionAB.getIdentifier());
+
+ synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
+
+ assertThrows(FlowSynchronizationException.class, () -> {
+ synchronizer.synchronize(connectionAB, null, group, synchronizationOptions);
+ });
+
+ // Ensure that the update occurred
+ verify(connectionAB, times(0)).setName("Hello");
+
+ // Ensure that the source was stopped but not restarted. We don't restart in this situation because the intent is to drop
+ // the connection so we will leave the source stopped so that the data can eventually drain from the queue and the connection
+ // can be removed.
+ verifyStopped(processorA);
+ verifyNotRestarted(processorA);
+ }
+
+ @Test
+ public void testWaitForQueueToEmpty() throws InterruptedException {
+ startProcessor(processorA);
+ startProcessor(processorB);
+ queuesWithData.add(connectionAB.getIdentifier());
+
+ // Use a background thread to synchronize the connection.
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+ final Thread syncThread = new Thread(() -> {
+ try {
+ synchronizer.synchronize(connectionAB, null, group, synchronizationOptions);
+ completionLatch.countDown();
+ } catch (final Exception e) {
+ Assert.fail(e.toString());
+ }
+ });
+ syncThread.start();
+
+ // Wait up to 1/2 second to ensure that the task does not complete.
+ final boolean completed = completionLatch.await(500, TimeUnit.MILLISECONDS);
+ assertFalse(completed);
+
+ // Clear the queue's data.
+ queuesWithData.clear();
+
+ // The task should now complete quickly. Give up to 5 seconds in case this is run in a slow environment.
+ assertTrue(completionLatch.await(5, TimeUnit.SECONDS));
+
+ // Ensure that the update occurred
+ verify(connectionAB, times(0)).setName("Hello");
+
+ // Ensure that the source was stopped, destination was stopped, and the connection was removed.
+ verifyStopped(processorA);
+ verifyNotRestarted(processorA);
+ verifyStopped(processorB);
+ verifyNotRestarted(processorB);
+ verify(group, times(1)).removeConnection(connectionAB);
+ }
+
+ @Test
+ public void testPortUpdatedWhenStopped() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ final VersionedPort versionedInputPort = createMinimalVersionedPort(ComponentType.INPUT_PORT);
+ synchronizer.synchronize(inputPort, versionedInputPort, group, synchronizationOptions);
+
+ verifyNotRestarted(inputPort);
+ verify(inputPort).setName("Input");
+
+ final VersionedPort versionedOutputPort = createMinimalVersionedPort(ComponentType.OUTPUT_PORT);
+ synchronizer.synchronize(outputPort, versionedOutputPort, group, synchronizationOptions);
+
+ verifyNotRestarted(outputPort);
+ verify(outputPort).setName("Output");
+ }
+
+ @Test
+ public void testPortStarted() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ final VersionedPort versionedInputPort = createMinimalVersionedPort(ComponentType.INPUT_PORT);
+ versionedInputPort.setScheduledState(ScheduledState.RUNNING);
+ synchronizer.synchronize(inputPort, versionedInputPort, group, synchronizationOptions);
+
+ verify(componentScheduler, atLeast(1)).transitionComponentState(inputPort, ScheduledState.RUNNING);
+ verify(inputPort).setName("Input");
+ }
+
+ @Test
+ public void testPortRestarted() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ final VersionedPort versionedInputPort = createMinimalVersionedPort(ComponentType.INPUT_PORT);
+ versionedInputPort.setScheduledState(ScheduledState.RUNNING);
+ synchronizer.synchronize(inputPort, versionedInputPort, group, synchronizationOptions);
+
+ verify(componentScheduler, atLeast(1)).transitionComponentState(inputPort, ScheduledState.RUNNING);
+ verify(inputPort).setName("Input");
+ }
+
+ @Test
+ public void testRemoveOutputPortFailsIfIncomingConnection() {
+ createMockConnection(processorA, outputPort, group);
+
+ assertThrows(FlowSynchronizationException.class, () -> {
+ synchronizer.synchronize(outputPort, null, group, synchronizationOptions);
+ });
+ }
+
+ @Test
+ public void testRemoveInputPortFailsIfOutgoingConnectionNotEmpty() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ final Connection connection = createMockConnection(inputPort, processorA, group);
+ synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
+
+ // Synchronize should succeed because connection doesn't have data.
+ synchronizer.synchronize(inputPort, null, group, synchronizationOptions);
+
+ // Now give it data
+ queuesWithData.add(connection.getIdentifier());
+
+ // Ensure that we fail to remove it due to FlowSynchronizationException because destination of connection is not running
+ assertThrows(FlowSynchronizationException.class, () -> {
+ synchronizer.synchronize(inputPort, null, group, synchronizationOptions);
+ });
+
+ // Start processor and ensure that we fail to remove it due to TimeoutException because destination of connection is now running
+ startProcessor(processorA);
+ assertThrows(TimeoutException.class, () -> {
+ synchronizer.synchronize(inputPort, null, group, synchronizationOptions);
+ });
+ }
+
+
+ @Test
+ public void testAddsControllerService() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ final VersionedControllerService versionedService = createMinimalVersionedControllerService();
+ synchronizer.synchronize(null, versionedService, group, synchronizationOptions);
+
+ verify(group).addControllerService(any(ControllerServiceNode.class));
+ }
+
+ @Test
+ public void testControllerServiceRemoved() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ final ControllerServiceNode service = createMockControllerService();
+ when(service.isActive()).thenReturn(true);
+ when(service.getState()).thenReturn(ControllerServiceState.ENABLED);
+ when(service.getReferences()).thenReturn(Mockito.mock(ControllerServiceReference.class));
+
+ when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(Collections.emptyMap());
+ when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenReturn(CompletableFuture.completedFuture(null));
+
+ synchronizer.synchronize(service, null, group, synchronizationOptions);
+
+ verify(controllerServiceProvider).unscheduleReferencingComponents(service);
+ verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service));
+ verify(controllerServiceProvider).removeControllerService(service);
+ }
+
+ @Test
+ public void testReferencesStoppedAndRestarted() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ final ControllerServiceNode service = createMockControllerService();
+ when(service.isActive()).thenReturn(true);
+ when(service.getState()).thenReturn(ControllerServiceState.ENABLED);
+
+ // Make Processors A and B reference the controller service and start them
+ setReferences(service, processorA, processorB);
+ startProcessor(processorB);
+
+ when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(Collections.singletonMap(processorB, CompletableFuture.completedFuture(null)));
+ when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenReturn(CompletableFuture.completedFuture(null));
+
+ final VersionedControllerService versionedControllerService = createMinimalVersionedControllerService();
+ versionedControllerService.setName("Hello");
+ versionedControllerService.setScheduledState(ScheduledState.RUNNING);
+
+ synchronizer.synchronize(service, versionedControllerService, group, synchronizationOptions);
+
+ verify(controllerServiceProvider).unscheduleReferencingComponents(service);
+ verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service));
+ verify(controllerServiceProvider).enableControllerServicesAsync(Collections.singleton(service));
+ verify(controllerServiceProvider).scheduleReferencingComponents(service, Collections.singleton(processorB), componentScheduler);
+ verify(service).setName("Hello");
+ }
+
+ @Test
+ public void testTerminateReferenceOnTimeout() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ final ControllerServiceNode service = createMockControllerService();
+ when(service.isActive()).thenReturn(true);
+ when(service.getState()).thenReturn(ControllerServiceState.ENABLED);
+
+ // Make Processors A and B reference the controller service and start them
+ setReferences(service, processorA, processorB);
+ startProcessor(processorB, false);
+
+ synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.TERMINATE);
+
+ // When unscheduleReferencingComponents is called, return a Future that will never complete.
+ when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(Collections.singletonMap(processorB, new CompletableFuture<>()));
+ when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenReturn(CompletableFuture.completedFuture(null));
+
+ final VersionedControllerService versionedControllerService = createMinimalVersionedControllerService();
+ versionedControllerService.setName("Hello");
+ versionedControllerService.setScheduledState(ScheduledState.RUNNING);
+
+ synchronizer.synchronize(service, versionedControllerService, group, synchronizationOptions);
+
+ verify(controllerServiceProvider).unscheduleReferencingComponents(service);
+ verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service));
+ verify(processorB).terminate();
+ verify(processorA, times(0)).terminate();
+ verify(controllerServiceProvider).enableControllerServicesAsync(Collections.singleton(service));
+ verify(controllerServiceProvider).scheduleReferencingComponents(service, Collections.singleton(processorB), componentScheduler);
+ verify(service).setName("Hello");
+ }
+
+
+ @Test
+ public void testCreatingParameterContext() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ final Map<String, String> parameterMap = new HashMap<>();
+ parameterMap.put("abc", "xyz");
+ parameterMap.put("secret", "yes");
+ final VersionedParameterContext proposed = createVersionedParameterContext("Context 1", parameterMap, Collections.singleton("secret"));
+
+ synchronizer.synchronize(null, proposed, synchronizationOptions);
+
+ final Set<ParameterContext> contexts = parameterContextManager.getParameterContexts();
+ assertEquals(1, contexts.size());
+
+ final ParameterContext created = contexts.iterator().next();
+ assertEquals(created.getName(), proposed.getName());
+
+ final Map<ParameterDescriptor, Parameter> createdParameters = created.getParameters();
+ assertEquals(2, createdParameters.size());
+
+ final Parameter abc = created.getParameter("abc").get();
+ assertEquals("abc", abc.getDescriptor().getName());
+ assertFalse(abc.getDescriptor().isSensitive());
+ assertEquals("xyz", abc.getValue());
+
+ final Parameter secret = created.getParameter("secret").get();
+ assertEquals("secret", secret.getDescriptor().getName());
+ assertTrue(secret.getDescriptor().isSensitive());
+ assertEquals("yes", secret.getValue());
+ }
+
+ @Test
+ public void testUpdateParametersNoReferences() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ // Create the initial context
+ testCreatingParameterContext();
+
+ final ParameterContext existing = parameterContextManager.getParameterContexts().iterator().next();
+
+ final Map<String, String> parameterMap = new HashMap<>();
+ parameterMap.put("abc", "123");
+ parameterMap.put("secret", "maybe");
+
+ final VersionedParameterContext proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+
+ synchronizer.synchronize(existing, proposed, synchronizationOptions);
+
+ assertEquals("123", existing.getParameter("abc").get().getValue());
+ assertEquals("maybe", existing.getParameter("secret").get().getValue());
+ assertEquals("Context 2", existing.getName());
+ }
+
+ @Test
+ public void testUpdateParametersReferenceProcessorNotStopping() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ // Create the initial context
+ testCreatingParameterContext();
+
+ final ParameterContext existing = parameterContextManager.getParameterContexts().iterator().next();
+
+ final Map<String, String> parameterMap = new HashMap<>();
+ parameterMap.put("abc", "123");
+ parameterMap.put("secret", "maybe");
+
+ final VersionedParameterContext proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+
+ final ProcessorNode processorA = createMockProcessor();
+ startProcessor(processorA, false);
+ synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
+ when(parameterReferenceManager.getProcessorsReferencing(existing, "abc")).thenReturn(Collections.singleton(processorA));
+
+ assertThrows(TimeoutException.class, () -> {
+ synchronizer.synchronize(existing, proposed, synchronizationOptions);
+ });
+
+ // Updates should not occur.
+ assertEquals("xyz", existing.getParameter("abc").get().getValue());
+ assertEquals("yes", existing.getParameter("secret").get().getValue());
+ assertEquals("Context 1", existing.getName());
+ }
+
+ @Test
+ public void testUpdateParametersReferenceStopping() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ // Create the initial context
+ testCreatingParameterContext();
+
+ final ParameterContext existing = parameterContextManager.getParameterContexts().iterator().next();
+
+ final Map<String, String> parameterMap = new HashMap<>();
+ parameterMap.put("abc", "123");
+ parameterMap.put("secret", "maybe");
+
+ final VersionedParameterContext proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+
+ final ProcessorNode processorA = createMockProcessor();
+ startProcessor(processorA, true);
+
+ final ProcessorNode processorB = createMockProcessor();
+
+ final AtomicBoolean serviceActive = new AtomicBoolean(true);
+
+ final ControllerServiceNode service = createMockControllerService();
+ when(service.isActive()).thenAnswer(invocation -> serviceActive.get());
+ when(service.getState()).thenAnswer(invocation -> serviceActive.get() ? ControllerServiceState.ENABLED : ControllerServiceState.DISABLED);
+
+ // Make Processors A and B reference the controller service and start them
+ setReferences(service, processorA, processorB);
+ startProcessor(processorB);
+
+ when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(Collections.singletonMap(processorB, CompletableFuture.completedFuture(null)));
+ when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenAnswer(invocation -> {
+ serviceActive.set(false);
+ return CompletableFuture.completedFuture(null);
+ });
+
+ when(parameterReferenceManager.getProcessorsReferencing(existing, "abc")).thenReturn(Collections.emptySet());
+ when(parameterReferenceManager.getControllerServicesReferencing(existing, "abc")).thenReturn(Collections.singleton(service));
+
+ synchronizer.synchronize(existing, proposed, synchronizationOptions);
+
+ // Updates should occur.
+ assertEquals("123", existing.getParameter("abc").get().getValue());
+ assertEquals("maybe", existing.getParameter("secret").get().getValue());
+ assertEquals("Context 2", existing.getName());
+
+ // Verify controller service/reference lifecycles
+ verify(controllerServiceProvider).unscheduleReferencingComponents(service);
+ verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service));
+ verify(controllerServiceProvider).enableControllerServicesAsync(Collections.singleton(service));
+ verify(componentScheduler).startComponent(processorB);
+ }
+
+ @Test
+ public void testUpdateParametersControllerServiceNotDisabling() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ // Create the initial context
+ testCreatingParameterContext();
+
+ final ParameterContext existing = parameterContextManager.getParameterContexts().iterator().next();
+
+ final Map<String, String> parameterMap = new HashMap<>();
+ parameterMap.put("abc", "123");
+ parameterMap.put("secret", "maybe");
+
+ final VersionedParameterContext proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+
+ final ProcessorNode processorA = createMockProcessor();
+ final ProcessorNode processorB = createMockProcessor();
+
+ final ControllerServiceNode service = createMockControllerService();
+ when(service.isActive()).thenReturn(true);
+ when(service.getState()).thenReturn(ControllerServiceState.ENABLED);
+
+ // Make Processors A and B reference the controller service and start them
+ setReferences(service, processorA, processorB);
+ startProcessor(processorA, true);
+ startProcessor(processorB);
+
+ final Map<ComponentNode, Future<Void>> completedFutureMap = new HashMap<>();
+ completedFutureMap.put(processorA, CompletableFuture.completedFuture(null));
+ completedFutureMap.put(processorB, CompletableFuture.completedFuture(null));
+
+ when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(completedFutureMap);
+ when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenReturn(new CompletableFuture<>()); // Never complete future = never disable service
+
+ synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.TERMINATE);
+
+ when(parameterReferenceManager.getProcessorsReferencing(existing, "abc")).thenReturn(Collections.emptySet());
+ when(parameterReferenceManager.getControllerServicesReferencing(existing, "abc")).thenReturn(Collections.singleton(service));
+
+ assertThrows(TimeoutException.class, () -> {
+ synchronizer.synchronize(existing, proposed, synchronizationOptions);
+ });
+
+ // Updates should not occur.
+ assertEquals("xyz", existing.getParameter("abc").get().getValue());
+ assertEquals("yes", existing.getParameter("secret").get().getValue());
+ assertEquals("Context 1", existing.getName());
+
+ // Verify controller service/reference lifecycles
+ verify(controllerServiceProvider).unscheduleReferencingComponents(service);
+ verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service));
+ verify(controllerServiceProvider, times(0)).enableControllerServicesAsync(Collections.singleton(service));
+ verify(componentScheduler).startComponent(processorA);
+ verify(componentScheduler).startComponent(processorB);
+ }
+
+ @Test
+ public void testGetUpdatedParameterNames() throws FlowSynchronizationException, InterruptedException, TimeoutException {
+ testCreatingParameterContext();
+ final ParameterContext existing = parameterContextManager.getParameterContexts().iterator().next();
+
+ final Map<String, String> originalParams = new HashMap<>();
+ originalParams.put("abc", "xyz");
+ originalParams.put("secret", "yes");
+
+ // Test no changes
+ Map<String, String> parameterMap = new HashMap<>(originalParams);
+ VersionedParameterContext proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+ assertEquals(Collections.emptySet(), synchronizer.getUpdatedParameterNames(existing, proposed));
+
+ // Test non-sensitive param change
+ parameterMap = new HashMap<>(originalParams);
+ parameterMap.put("abc", "hello");
+ proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+ assertEquals(Collections.singleton("abc"), synchronizer.getUpdatedParameterNames(existing, proposed));
+
+ // Test sensitive param change
+ parameterMap = new HashMap<>(originalParams);
+ parameterMap.put("secret", "secret");
+ proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+ assertEquals(Collections.singleton("secret"), synchronizer.getUpdatedParameterNames(existing, proposed));
+
+ // Test removed parameters
+ parameterMap.clear();
+ proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+ assertEquals(new HashSet<>(Arrays.asList("abc", "secret")), synchronizer.getUpdatedParameterNames(existing, proposed));
+
+ // Test added parameter
+ parameterMap = new HashMap<>(originalParams);
+ parameterMap.put("Added", "Added");
+ proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+ assertEquals(Collections.singleton("Added"), synchronizer.getUpdatedParameterNames(existing, proposed));
+
+ // Test added, removed, and updated parameters
+ parameterMap = new HashMap<>(originalParams);
+ parameterMap.put("Added", "Added");
+ parameterMap.put("Added 2", "Added");
+ parameterMap.remove("secret");
+ parameterMap.put("abc", "hello");
+ proposed = createVersionedParameterContext("Context 2", parameterMap, Collections.singleton("secret"));
+ assertEquals(new HashSet<>(Arrays.asList("abc", "secret", "Added", "Added 2")), synchronizer.getUpdatedParameterNames(existing, proposed));
+ }
+
+
+ private VersionedParameterContext createVersionedParameterContext(final String name, final Map<String, String> parameters, final Set<String> sensitiveParamNames) {
+ final Set<VersionedParameter> versionedParameters = new HashSet<>();
+ for (final Map.Entry<String, String> entry : parameters.entrySet()) {
+ final VersionedParameter param = new VersionedParameter();
+ param.setName(entry.getKey());
+ param.setValue(entry.getValue());
+ param.setSensitive(sensitiveParamNames.contains(entry.getKey()));
+ versionedParameters.add(param);
+ }
+
+ final VersionedParameterContext context = new VersionedParameterContext();
+ context.setName(name);
+ context.setDescription("Generated for unit test");
+ context.setParameters(versionedParameters);
+ context.setIdentifier(UUID.randomUUID().toString());
+
+ return context;
+ }
+
+ private void setReferences(final ControllerServiceNode service, final ComponentNode... reference) {
+ final ControllerServiceReference csReference = Mockito.mock(ControllerServiceReference.class);
+ when(csReference.getReferencingComponents()).thenReturn(new HashSet<>(Arrays.asList(reference)));
+ when(service.getReferences()).thenReturn(csReference);
+ }
+
+
+ //////////
+ // Convenience methods for testing
+ //////////
+
+ private void startProcessor(final ProcessorNode processor) {
+ startProcessor(processor, true);
+ }
+
+ private void startProcessor(final ProcessorNode processor, final boolean allowStopToComplete) {
+ when(processor.isRunning()).thenReturn(true);
+ when(processor.getScheduledState()).thenReturn(org.apache.nifi.controller.ScheduledState.RUNNING);
+
+ // If we want the stopping to complete, created an already-completed future. Otherwise, create a CompletableFuture that we will never complete.
+ final CompletableFuture<Void> future = allowStopToComplete ? CompletableFuture.completedFuture(null) : new CompletableFuture<>();
+ when(group.stopProcessor(processor)).thenReturn(future);
+ }
+
+ private void verifyStopped(final ProcessorNode processor) {
+ verify(group, atLeast(1)).stopProcessor(processor);
+ }
+
+ private void verifyStopped(final Port port) {
+ if (port.getConnectableType() == ConnectableType.INPUT_PORT) {
+ verify(group, atLeast(1)).stopInputPort(port);
+ } else {
+ verify(group, atLeast(1)).stopOutputPort(port);
+ }
+ }
+
+ private void verifyRestarted(final Connectable component) {
+ verify(componentScheduler, atLeast(1)).startComponent(component);
+ }
+
+ private void verifyNotRestarted(final Connectable component) {
+ verify(componentScheduler, atLeast(0)).startComponent(component);
+ }
+
+ private VersionedConnection createMinimalVersionedConnection(final ProcessorNode source, final ProcessorNode destination) {
+ final ConnectableComponent connectableComponentA = createConnectableComponent(source);
+ final ConnectableComponent connectableComponentB = createConnectableComponent(destination);
+
+ final VersionedConnection versionedConnection = new VersionedConnection();
+ versionedConnection.setBackPressureDataSizeThreshold("1 GB");
+ versionedConnection.setBackPressureObjectThreshold(10000L);
+ versionedConnection.setSource(connectableComponentA);
+ versionedConnection.setDestination(connectableComponentB);
+ versionedConnection.setLoadBalanceStrategy(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE.name());
+ versionedConnection.setLabelIndex(0);
+ versionedConnection.setSelectedRelationships(Collections.singleton("success"));
+ versionedConnection.setzIndex(0L);
+
+ return versionedConnection;
+ }
+
+ private ConnectableComponent createConnectableComponent(final ProcessorNode processor) {
+ final ConnectableComponent component = new ConnectableComponent();
+ component.setId(processor.getIdentifier());
+ component.setInstanceIdentifier(processor.getIdentifier());
+ component.setType(ConnectableComponentType.PROCESSOR);
+ return component;
+ }
+
+ private VersionedProcessor createMinimalVersionedProcessor() {
+ final VersionedProcessor versionedProcessor = new VersionedProcessor();
+ versionedProcessor.setIdentifier("12345");
+ versionedProcessor.setName("name");
+ versionedProcessor.setAutoTerminatedRelationships(Collections.emptySet());
+ versionedProcessor.setBundle(bundle);
+ versionedProcessor.setBulletinLevel(LogLevel.WARN.name());
+ versionedProcessor.setConcurrentlySchedulableTaskCount(1);
+ versionedProcessor.setPropertyDescriptors(Collections.emptyMap());
+ versionedProcessor.setScheduledState(ScheduledState.ENABLED);
+ versionedProcessor.setRunDurationMillis(0L);
+ versionedProcessor.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN.name());
+ versionedProcessor.setExecutionNode(ExecutionNode.ALL.name());
+ versionedProcessor.setProperties(Collections.singletonMap("abc", "123"));
+ versionedProcessor.setPosition(new Position(0D, 0D));
+
+ return versionedProcessor;
+ }
+
+ private VersionedControllerService createMinimalVersionedControllerService() {
+ final VersionedControllerService versionedService = new VersionedControllerService();
+ versionedService.setIdentifier("12345");
+ versionedService.setName("name");
+ versionedService.setBundle(bundle);
+ versionedService.setPropertyDescriptors(Collections.emptyMap());
+ versionedService.setScheduledState(ScheduledState.DISABLED);
+ versionedService.setProperties(Collections.singletonMap("abc", "123"));
+ versionedService.setPosition(new Position(0D, 0D));
+ versionedService.setType("ControllerServiceImpl");
+
+ return versionedService;
+ }
+
+ private VersionedPort createMinimalVersionedPort(final ComponentType componentType) {
+ final VersionedPort versionedPort = new VersionedPort();
+ versionedPort.setIdentifier("1234");
+ versionedPort.setInstanceIdentifier("1234");
+ versionedPort.setName(componentType == ComponentType.INPUT_PORT ? "Input" : "Output");
+ versionedPort.setScheduledState(ScheduledState.ENABLED);
+ versionedPort.setComponentType(ComponentType.INPUT_PORT);
+ versionedPort.setPosition(new Position(0D, 0D));
+ versionedPort.setConcurrentlySchedulableTaskCount(1);
+
+ return versionedPort;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
index c0923b5b6a..ff94934442 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -16,13 +16,6 @@
*/
package org.apache.nifi.controller;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
@@ -34,6 +27,13 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
public interface ProcessScheduler {
/**
@@ -224,7 +224,7 @@ public interface ProcessScheduler {
*
* @param taskNode to unschedule
*/
- void unschedule(ReportingTaskNode taskNode);
+ Future<Void> unschedule(ReportingTaskNode taskNode);
/**
* Begins scheduling the given Reporting Task to run
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index dd9c1652e7..83b0b411a2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -125,4 +125,11 @@ public interface ReportingTaskNode extends ComponentNode {
*/
List<ConfigVerificationResult> verifyConfiguration(ConfigurationContext context, ComponentLog logger, ExtensionManager extensionManager);
+ void start();
+
+ void stop();
+
+ void enable();
+
+ void disable();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index f610349691..cee1e5a303 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -19,9 +19,11 @@ package org.apache.nifi.controller.service;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.groups.ComponentScheduler;
import org.apache.nifi.nar.ExtensionManager;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
@@ -134,7 +136,7 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
*
* @param serviceNode the node
*/
- Set<ComponentNode> unscheduleReferencingComponents(ControllerServiceNode serviceNode);
+ Map<ComponentNode, Future<Void>> unscheduleReferencingComponents(ControllerServiceNode serviceNode);
/**
* Verifies that all Controller Services referencing the provided Controller
@@ -201,6 +203,12 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
*/
Set<ComponentNode> scheduleReferencingComponents(ControllerServiceNode serviceNode);
+ /**
+ * Schedules any of the candidate components that are currently referencing the given Controller Service to run.
+ * @return the components that were scheduled
+ */
+ Set<ComponentNode> scheduleReferencingComponents(ControllerServiceNode serviceNode, Set<ComponentNode> candidates, ComponentScheduler componentScheduler);
+
/**
*
* @param serviceType type of service to get identifiers for
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java
index d1f6cc1ac0..ac44116977 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/AbstractComponentScheduler.java
@@ -25,6 +25,7 @@ import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.controller.ReportingTaskNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +41,8 @@ public abstract class AbstractComponentScheduler implements ComponentScheduler {
private final VersionedComponentStateLookup stateLookup;
private final AtomicLong pauseCount = new AtomicLong(0L);
- private final Queue<Connectable> toStart = new LinkedBlockingQueue<>();
+ private final Queue<Connectable> connectablesToStart = new LinkedBlockingQueue<>();
+ private final Queue<ReportingTaskNode> reportingTasksToStart = new LinkedBlockingQueue<>();
private final Queue<ControllerServiceNode> toEnable = new LinkedBlockingQueue<>();
public AbstractComponentScheduler(final ControllerServiceProvider controllerServiceProvider, final VersionedComponentStateLookup stateLookup) {
@@ -67,10 +69,16 @@ public abstract class AbstractComponentScheduler implements ComponentScheduler {
enableNow(toEnable);
Connectable connectable;
- while ((connectable = toStart.poll()) != null) {
+ while ((connectable = connectablesToStart.poll()) != null) {
logger.debug("{} starting {}", this, connectable);
startNow(connectable);
}
+
+ ReportingTaskNode taskNode;
+ while ((taskNode = reportingTasksToStart.poll()) != null) {
+ logger.debug("{} starting {}", this, taskNode);
+ startNow(taskNode);
+ }
}
private boolean isPaused() {
@@ -180,7 +188,7 @@ public abstract class AbstractComponentScheduler implements ComponentScheduler {
public void startComponent(final Connectable component) {
if (isPaused()) {
logger.debug("{} called to start {} but paused so will queue it for start later", this, component);
- toStart.offer(component);
+ connectablesToStart.offer(component);
} else {
logger.debug("{} starting {} now", this, component);
startNow(component);
@@ -228,7 +236,19 @@ public abstract class AbstractComponentScheduler implements ComponentScheduler {
return serviceProvider;
}
+ public void startReportingTask(final ReportingTaskNode reportingTask) {
+ if (isPaused()) {
+ logger.debug("{} called to start {} but paused so will queue it for start later", this, reportingTask);
+ reportingTasksToStart.offer(reportingTask);
+ } else {
+ logger.debug("{} starting {} now", this, reportingTask);
+ startNow(reportingTask);
+ }
+ }
+
protected abstract void startNow(Connectable component);
protected abstract void enableNow(Collection<ControllerServiceNode> controllerServices);
+
+ protected abstract void startNow(ReportingTaskNode reportingTask);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java
index 6a38dde93d..ef156830e3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentScheduler.java
@@ -18,6 +18,7 @@
package org.apache.nifi.groups;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.ScheduledState;
@@ -34,6 +35,8 @@ public interface ComponentScheduler {
void disableControllerServicesAsync(Collection<ControllerServiceNode> controllerServices);
+ void startReportingTask(ReportingTaskNode reportingTask);
+
void pause();
void resume();
@@ -60,6 +63,9 @@ public interface ComponentScheduler {
public void disableControllerServicesAsync(final Collection<ControllerServiceNode> controllerServices) {
}
+ public void startReportingTask(final ReportingTaskNode reportingTask) {
+ }
+
@Override
public void pause() {
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/GroupSynchronizationOptions.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java
similarity index 79%
rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/GroupSynchronizationOptions.java
rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java
index 661b986fd7..bc7ebb0a58 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/GroupSynchronizationOptions.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java
@@ -17,7 +17,9 @@
package org.apache.nifi.groups;
-public class GroupSynchronizationOptions {
+import java.time.Duration;
+
+public class FlowSynchronizationOptions {
private final ComponentIdGenerator componentIdGenerator;
private final ComponentScheduler componentScheduler;
private final PropertyDecryptor propertyDecryptor;
@@ -27,8 +29,10 @@ public class GroupSynchronizationOptions {
private final boolean updateGroupVersionControlSnapshot;
private final boolean updateExistingVariables;
private final boolean updateRpgUrls;
+ private final Duration componentStopTimeout;
+ private final ComponentStopTimeoutAction timeoutAction;
- private GroupSynchronizationOptions(final Builder builder) {
+ private FlowSynchronizationOptions(final Builder builder) {
this.componentIdGenerator = builder.componentIdGenerator;
this.componentScheduler = builder.componentScheduler;
this.propertyDecryptor = builder.propertyDecryptor;
@@ -38,6 +42,8 @@ public class GroupSynchronizationOptions {
this.updateGroupVersionControlSnapshot = builder.updateGroupVersionControlSnapshot;
this.updateExistingVariables = builder.updateExistingVariables;
this.updateRpgUrls = builder.updateRpgUrls;
+ this.componentStopTimeout = builder.componentStopTimeout;
+ this.timeoutAction = builder.timeoutAction;
}
public ComponentIdGenerator getComponentIdGenerator() {
@@ -76,6 +82,13 @@ public class GroupSynchronizationOptions {
return propertyDecryptor;
}
+ public Duration getComponentStopTimeout() {
+ return componentStopTimeout;
+ }
+
+ public ComponentStopTimeoutAction getComponentStopTimeoutAction() {
+ return timeoutAction;
+ }
public static class Builder {
private ComponentIdGenerator componentIdGenerator;
@@ -87,6 +100,9 @@ public class GroupSynchronizationOptions {
private boolean updateExistingVariables = false;
private boolean updateRpgUrls = false;
private PropertyDecryptor propertyDecryptor = value -> value;
+ private Duration componentStopTimeout = Duration.ofSeconds(30);
+ private ComponentStopTimeoutAction timeoutAction = ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION;
+
/**
* Specifies the Component ID Generator to use for generating UUID's of components that are to be added to a ProcessGroup
@@ -190,8 +206,28 @@ public class GroupSynchronizationOptions {
return this;
}
+ /**
+ * When stopping or disabling a component, specifies how long to wait for the component to be fully stopped/disabled
+ * @param duration the duration to wait when stopping or disabling a component
+ * @return the builder
+ */
+ public Builder componentStopTimeout(final Duration duration) {
+ this.componentStopTimeout = duration;
+ return this;
+ }
+
+ /**
+ * If the component doesn't stop/disable in time, specifies what action should be taken
+ * @param action the action to take
+ * @return the builder
+ */
+ public Builder componentStopTimeoutAction(final ComponentStopTimeoutAction action) {
+ this.timeoutAction = action;
+ return this;
+ }
+
- public GroupSynchronizationOptions build() {
+ public FlowSynchronizationOptions build() {
if (componentIdGenerator == null) {
throw new IllegalStateException("Must set Component ID Generator");
}
@@ -199,10 +235,10 @@ public class GroupSynchronizationOptions {
throw new IllegalStateException("Must set Component Scheduler");
}
- return new GroupSynchronizationOptions(this);
+ return new FlowSynchronizationOptions(this);
}
- public static Builder from(final GroupSynchronizationOptions options) {
+ public static Builder from(final FlowSynchronizationOptions options) {
final Builder builder = new Builder();
builder.componentIdGenerator = options.getComponentIdGenerator();
builder.componentScheduler = options.getComponentScheduler();
@@ -213,8 +249,23 @@ public class GroupSynchronizationOptions {
builder.updateExistingVariables = options.isUpdateExistingVariables();
builder.updateRpgUrls = options.isUpdateRpgUrls();
builder.propertyDecryptor = options.getPropertyDecryptor();
+ builder.componentStopTimeout = options.getComponentStopTimeout();
+ builder.timeoutAction = options.getComponentStopTimeoutAction();
return builder;
}
}
+
+ public enum ComponentStopTimeoutAction {
+ /**
+ * If the timeout occurs, a {@link java.util.concurrent.TimeoutException TimeoutException} should be thrown
+ */
+ THROW_TIMEOUT_EXCEPTION,
+
+ /**
+ * If a timeout occurs when stopping a processor, the Processor should be terminated and no Exception should be thrown.
+ * If a Controller Service or Reporting Task fails to stop/disable in time, a {@link java.util.concurrent.TimeoutException} will still be thrown.
+ */
+ TERMINATE;
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index 5d253c5f55..7579b721a2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -889,7 +889,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @param synchronizationOptions options for how the synchronization should occur
* @param flowMappingOptions options for how to map the existing dataflow into Versioned components so that it can be compared to the proposed snapshot
*/
- void synchronizeFlow(VersionedExternalFlow proposedSnapshot, GroupSynchronizationOptions synchronizationOptions, FlowMappingOptions flowMappingOptions);
+ void synchronizeFlow(VersionedExternalFlow proposedSnapshot, FlowSynchronizationOptions synchronizationOptions, FlowMappingOptions flowMappingOptions);
/**
* Verifies a template with the specified name can be created.
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
index 18f9b980c5..c9b4da9420 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
@@ -35,12 +35,9 @@ import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class StandardReportingTaskNode extends AbstractReportingTaskNode implements ReportingTaskNode {
- private static final Logger logger = LoggerFactory.getLogger(StandardReportingTaskNode.class);
private final FlowController flowController;
public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller,
@@ -94,4 +91,26 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
protected ParameterContext getParameterContext() {
return null;
}
+
+ @Override
+ public void start() {
+ verifyCanStart();
+ flowController.startReportingTask(this);
+ }
+
+ @Override
+ public void stop() {
+ verifyCanStop();
+ flowController.stopReportingTask(this);
+ }
+
+ public void enable() {
+ verifyCanEnable();
+ flowController.enableReportingTask(this);
+ }
+
+ public void disable() {
+ verifyCanDisable();
+ flowController.disableReportingTask(this);
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index a7dc11885d..d5d136b176 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -269,10 +269,10 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
@Override
- public void unschedule(final ReportingTaskNode taskNode) {
+ public Future<Void> unschedule(final ReportingTaskNode taskNode) {
final LifecycleState lifecycleState = getLifecycleState(requireNonNull(taskNode), false);
if (!lifecycleState.isScheduled()) {
- return;
+ return CompletableFuture.completedFuture(null);
}
taskNode.verifyCanStop();
@@ -280,6 +280,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
final ReportingTask reportingTask = taskNode.getReportingTask();
taskNode.setScheduledState(ScheduledState.STOPPED);
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+
final Runnable unscheduleReportingTaskRunnable = new Runnable() {
@Override
public void run() {
@@ -304,12 +306,14 @@ public final class StandardProcessScheduler implements ProcessScheduler {
if (lifecycleState.getActiveThreadCount() == 0 && lifecycleState.mustCallOnStoppedMethods()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, configurationContext);
+ future.complete(null);
}
}
}
};
componentLifeCycleThreadPool.execute(unscheduleReportingTaskRunnable);
+ return future;
}
/**
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 1dbf3b5b38..5d2011fd57 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -62,7 +62,7 @@ import org.apache.nifi.groups.AbstractComponentScheduler;
import org.apache.nifi.groups.BundleUpdateStrategy;
import org.apache.nifi.groups.ComponentIdGenerator;
import org.apache.nifi.groups.ComponentScheduler;
-import org.apache.nifi.groups.GroupSynchronizationOptions;
+import org.apache.nifi.groups.FlowSynchronizationOptions;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.Parameter;
@@ -332,7 +332,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
allTemplates.forEach(template -> template.getProcessGroup().removeTemplate(template));
// Synchronize the root group
- final GroupSynchronizationOptions syncOptions = new GroupSynchronizationOptions.Builder()
+ final FlowSynchronizationOptions syncOptions = new FlowSynchronizationOptions.Builder()
.componentIdGenerator(componentIdGenerator)
.componentScheduler(componentScheduler)
.ignoreLocalModifications(true)
@@ -1021,5 +1021,9 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
protected void enableNow(final Collection<ControllerServiceNode> controllerServices) {
flowController.getControllerServiceProvider().enableControllerServices(controllerServices);
}
+
+ protected void startNow(final ReportingTaskNode reportingTask) {
+ flowController.startReportingTask(reportingTask);
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index 2a2e641bcf..35657d0d3f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -39,7 +39,7 @@ import org.apache.nifi.groups.DataValve;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileGate;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
-import org.apache.nifi.groups.GroupSynchronizationOptions;
+import org.apache.nifi.groups.FlowSynchronizationOptions;
import org.apache.nifi.groups.NoOpBatchCounts;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
@@ -719,7 +719,7 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
- public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
+ public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final FlowSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index 055d0251ef..3210347b22 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -243,7 +243,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
if (ScheduledState.RUNNING.equals(scheduledState)) {
return serviceProvider.scheduleReferencingComponents(controllerService);
} else {
- return serviceProvider.unscheduleReferencingComponents(controllerService);
+ return serviceProvider.unscheduleReferencingComponents(controllerService).keySet();
}
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
index 5a24f371f7..fdcc874420 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java
@@ -56,6 +56,24 @@ public class StatelessReportingTaskNode extends AbstractReportingTaskNode implem
return new StatelessReportingContext(statelessEngine, flowManager, getEffectivePropertyValues(), getReportingTask(), getVariableRegistry(), getParameterLookup());
}
+ @Override
+ public void start() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void stop() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void enable() {
+ }
+
+ @Override
+ public void disable() {
+ }
+
@Override
public Class<?> getComponentClass() {
return null;
@@ -80,4 +98,6 @@ public class StatelessReportingTaskNode extends AbstractReportingTaskNode implem
public Resource getResource() {
return null;
}
+
+
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
index 5b32d2882d..b47b43a483 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
@@ -245,7 +245,8 @@ public class StatelessProcessScheduler implements ProcessScheduler {
}
@Override
- public void unschedule(final ReportingTaskNode taskNode) {
+ public Future<Void> unschedule(final ReportingTaskNode taskNode) {
+ return CompletableFuture.completedFuture(null);
}
@Override