You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/02/25 19:29:59 UTC
[nifi] branch main updated: NIFI-9729: When restarting components in the VersionedFlowSynchronizer, first filter out any components that are intended to be stopped.
This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6cea5ea NIFI-9729: When restarting components in the VersionedFlowSynchronizer, first filter out any components that are intended to be stopped.
6cea5ea is described below
commit 6cea5ea520f48c5d05a30df93fa582cf42d3438e
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Feb 25 11:32:27 2022 -0500
NIFI-9729: When restarting components in the VersionedFlowSynchronizer, first filter out any components that are intended to be stopped.
Signed-off-by: Joe Gresock <jg...@gmail.com>
This closes #5806.
---
.../nifi/groups/ProcessGroupSynchronizer.java | 6 ++
.../groups/StandardProcessGroupSynchronizer.java | 55 +++++++---
.../serialization/AffectedComponentSet.java | 10 ++
.../serialization/ComponentSetFilter.java | 81 ++++++++++++++
.../serialization/RunningComponentSetFilter.java | 117 +++++++++++++++++++++
.../serialization/VersionedFlowSynchronizer.java | 7 +-
.../system/clustering/FlowSynchronizationIT.java | 37 +++++++
7 files changed, 298 insertions(+), 15 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java
index 4f7a309..688ea1c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java
@@ -23,6 +23,12 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
public interface ProcessGroupSynchronizer {
+ /**
+ * Synchronize the given Process Group to match the proposed snaphsot
+ * @param group the Process Group to update
+ * @param proposedSnapshot the proposed/desired state for the process group
+ * @param synchronizationOptions options for how to synchronize the group
+ */
void synchronize(ProcessGroup group, VersionedFlowSnapshot proposedSnapshot, GroupSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException;
void verifyCanSynchronize(ProcessGroup group, VersionedProcessGroup proposed, boolean verifyConnectionRemoval);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
index 39004a5..7ec6231 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
@@ -108,6 +108,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -1696,33 +1697,59 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
if (proposed.getInputPorts() != null) {
for (final VersionedRemoteGroupPort port : proposed.getInputPorts()) {
- if (port.getScheduledState() != org.apache.nifi.flow.ScheduledState.RUNNING) {
- continue;
- }
-
- final String portId = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), rpg.getIdentifier());
- final RemoteGroupPort remoteGroupPort = rpg.getInputPort(portId);
+ final RemoteGroupPort remoteGroupPort = getRpgInputPort(port, rpg, componentIdGenerator);
if (remoteGroupPort != null) {
- context.getComponentScheduler().startComponent(remoteGroupPort);
+ synchronizeTransmissionState(port, remoteGroupPort);
}
}
}
if (proposed.getOutputPorts() != null) {
for (final VersionedRemoteGroupPort port : proposed.getOutputPorts()) {
- if (port.getScheduledState() != org.apache.nifi.flow.ScheduledState.RUNNING) {
- continue;
- }
-
- final String portId = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), rpg.getIdentifier());
- final RemoteGroupPort remoteGroupPort = rpg.getOutputPort(portId);
+ final RemoteGroupPort remoteGroupPort = getRpgOutputPort(port, rpg, componentIdGenerator);
if (remoteGroupPort != null) {
- context.getComponentScheduler().startComponent(remoteGroupPort);
+ synchronizeTransmissionState(port, remoteGroupPort);
}
}
}
}
+ private RemoteGroupPort getRpgInputPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator) {
+ return getRpgPort(port, rpg, componentIdGenerator, RemoteProcessGroup::getInputPort);
+ }
+
+ private RemoteGroupPort getRpgOutputPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator) {
+ return getRpgPort(port, rpg, componentIdGenerator, RemoteProcessGroup::getOutputPort);
+ }
+
+ private RemoteGroupPort getRpgPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator,
+ final BiFunction<RemoteProcessGroup, String, RemoteGroupPort> portLookup) {
+ final String instanceId = port.getInstanceIdentifier();
+ if (instanceId != null) {
+ final RemoteGroupPort remoteGroupPort = portLookup.apply(rpg, instanceId);
+ if (remoteGroupPort != null) {
+ return remoteGroupPort;
+ }
+ }
+
+ final String portId = componentIdGenerator.generateUuid(port.getIdentifier(), port.getInstanceIdentifier(), rpg.getIdentifier());
+ final RemoteGroupPort remoteGroupPort = portLookup.apply(rpg, portId);
+ return remoteGroupPort;
+ }
+
+ private void synchronizeTransmissionState(final VersionedRemoteGroupPort versionedPort, final RemoteGroupPort remoteGroupPort) {
+ final ScheduledState portState = remoteGroupPort.getScheduledState();
+
+ if (versionedPort.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) {
+ if (portState != ScheduledState.RUNNING) {
+ context.getComponentScheduler().startComponent(remoteGroupPort);
+ }
+ } else {
+ if (portState == ScheduledState.RUNNING) {
+ remoteGroupPort.getRemoteProcessGroup().stopTransmitting(remoteGroupPort);
+ }
+ }
+ }
private RemoteProcessGroupPortDescriptor createPortDescriptor(final VersionedRemoteGroupPort proposed, final ComponentIdGenerator componentIdGenerator, final String rpgId) {
final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
index b62881e..d0b7970 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
@@ -482,6 +482,16 @@ public class AffectedComponentSet {
reportingTasks.forEach(flowController::startReportingTask);
}
+ public void removeComponents(final ComponentSetFilter filter) {
+ inputPorts.removeIf(filter::testInputPort);
+ outputPorts.removeIf(filter::testOutputPort);
+ remoteInputPorts.removeIf(filter::testRemoteInputPort);
+ remoteOutputPorts.removeIf(filter::testRemoteOutputPort);
+ processors.removeIf(filter::testProcessor);
+ controllerServices.removeIf(filter::testControllerService);
+ reportingTasks.removeIf(filter::testReportingTask);
+ }
+
/**
* Returns a new AffectedComponentSet that represents only those components that currently exist within the NiFi instance. When a set of dataflow updates have occurred, it is very possible
* that one or more components referred to by the AffectedComponentSet no longer exist (for example, there was a dataflow update that removed a Processor, so that Processor no longer exists).
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java
new file mode 100644
index 0000000..6edc155
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.serialization;
+
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.remote.RemoteGroupPort;
+
+public interface ComponentSetFilter {
+ boolean testProcessor(ProcessorNode processor);
+
+ boolean testReportingTask(ReportingTaskNode reportingTask);
+
+ boolean testControllerService(ControllerServiceNode controllerService);
+
+ boolean testInputPort(Port port);
+
+ boolean testOutputPort(Port port);
+
+ boolean testRemoteInputPort(RemoteGroupPort port);
+
+ boolean testRemoteOutputPort(RemoteGroupPort port);
+
+ default ComponentSetFilter reverse() {
+ final ComponentSetFilter original = this;
+
+ return new ComponentSetFilter() {
+ @Override
+ public boolean testProcessor(final ProcessorNode processor) {
+ return !original.testProcessor(processor);
+ }
+
+ @Override
+ public boolean testReportingTask(final ReportingTaskNode reportingTask) {
+ return !original.testReportingTask(reportingTask);
+ }
+
+ @Override
+ public boolean testControllerService(final ControllerServiceNode controllerService) {
+ return !original.testControllerService(controllerService);
+ }
+
+ @Override
+ public boolean testInputPort(final Port port) {
+ return !original.testInputPort(port);
+ }
+
+ @Override
+ public boolean testOutputPort(final Port port) {
+ return !original.testOutputPort(port);
+ }
+
+ @Override
+ public boolean testRemoteInputPort(final RemoteGroupPort port) {
+ return !original.testRemoteInputPort(port);
+ }
+
+ @Override
+ public boolean testRemoteOutputPort(final RemoteGroupPort port) {
+ return !original.testRemoteOutputPort(port);
+ }
+ };
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/RunningComponentSetFilter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/RunningComponentSetFilter.java
new file mode 100644
index 0000000..c141144
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/RunningComponentSetFilter.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.serialization;
+
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.remote.RemoteGroupPort;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RunningComponentSetFilter implements ComponentSetFilter {
+ private final Map<String, VersionedControllerService> controllerServices = new HashMap<>();
+ private final Map<String, VersionedProcessor> processors = new HashMap<>();
+ private final Map<String, VersionedReportingTask> reportingTasks = new HashMap<>();
+ private final Map<String, VersionedPort> inputPorts = new HashMap<>();
+ private final Map<String, VersionedPort> outputPorts = new HashMap<>();
+ private final Map<String, VersionedRemoteGroupPort> remoteInputPorts = new HashMap<>();
+ private final Map<String, VersionedRemoteGroupPort> remoteOutputPorts = new HashMap<>();
+
+ public RunningComponentSetFilter(final VersionedDataflow dataflow) {
+ dataflow.getControllerServices().forEach(service -> controllerServices.put(service.getInstanceIdentifier(), service));
+ dataflow.getReportingTasks().forEach(task -> reportingTasks.put(task.getInstanceIdentifier(), task));
+ flatten(dataflow.getRootGroup());
+ }
+
+ private void flatten(final VersionedProcessGroup group) {
+ group.getInputPorts().forEach(port -> inputPorts.put(port.getInstanceIdentifier(), port));
+ group.getOutputPorts().forEach(port -> outputPorts.put(port.getInstanceIdentifier(), port));
+ group.getControllerServices().forEach(service -> controllerServices.put(service.getInstanceIdentifier(), service));
+ group.getProcessors().forEach(processor -> processors.put(processor.getInstanceIdentifier(), processor));
+
+ for (final VersionedRemoteProcessGroup rpg : group.getRemoteProcessGroups()) {
+ rpg.getInputPorts().forEach(port -> {
+ if (port.getInstanceIdentifier() != null) {
+ remoteInputPorts.put(port.getInstanceIdentifier(), port);
+ }
+ });
+
+ rpg.getOutputPorts().forEach(port -> {
+ if (port.getInstanceIdentifier() != null) {
+ remoteOutputPorts.put(port.getInstanceIdentifier(), port);
+ }
+ });
+ }
+
+ group.getProcessGroups().forEach(this::flatten);
+ }
+
+ @Override
+ public boolean testProcessor(final ProcessorNode processor) {
+ final VersionedProcessor versionedProcessor = processors.get(processor.getIdentifier());
+ return versionedProcessor != null && versionedProcessor.getScheduledState() == ScheduledState.RUNNING;
+ }
+
+ @Override
+ public boolean testReportingTask(final ReportingTaskNode reportingTask) {
+ final VersionedReportingTask versionedReportingTask = reportingTasks.get(reportingTask.getIdentifier());
+ return versionedReportingTask != null && versionedReportingTask.getScheduledState() == ScheduledState.RUNNING;
+ }
+
+ @Override
+ public boolean testControllerService(final ControllerServiceNode controllerService) {
+ final VersionedControllerService versionedService = controllerServices.get(controllerService.getIdentifier());
+ return versionedService != null && versionedService.getScheduledState() == ScheduledState.ENABLED;
+ }
+
+ @Override
+ public boolean testInputPort(final Port port) {
+ final VersionedPort versionedPort = inputPorts.get(port.getIdentifier());
+ return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING;
+ }
+
+ @Override
+ public boolean testOutputPort(final Port port) {
+ final VersionedPort versionedPort = outputPorts.get(port.getIdentifier());
+ return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING;
+ }
+
+ @Override
+ public boolean testRemoteInputPort(final RemoteGroupPort port) {
+ final VersionedRemoteGroupPort versionedPort = remoteInputPorts.get(port.getIdentifier());
+ return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING;
+ }
+
+ @Override
+ public boolean testRemoteOutputPort(final RemoteGroupPort port) {
+ final VersionedRemoteGroupPort versionedPort = remoteOutputPorts.get(port.getIdentifier());
+ return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index cd2fef0..2262103 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -186,7 +186,12 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
} finally {
// We have to call toExistingSet() here because some of the components that existed in the active set may no longer exist,
// so attempting to start them will fail.
- activeSet.toExistingSet().toStartableSet().start();
+ final AffectedComponentSet startable = activeSet.toExistingSet().toStartableSet();
+
+ final ComponentSetFilter runningComponentFilter = new RunningComponentSetFilter(proposedFlow.getVersionedDataflow());
+ final ComponentSetFilter stoppedComponentFilter = runningComponentFilter.reverse();
+ startable.removeComponents(stoppedComponentFilter);
+ startable.start();
}
final long millis = System.currentTimeMillis() - start;
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index b7dcad1..bba6061 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -437,6 +437,43 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
waitFor(() -> isNodeDisconnectedDueToMissingConnection(5672, connection.getId()));
}
+ @Test
+ public void testComponentStatesRestoredOnReconnect() throws NiFiClientException, IOException, InterruptedException {
+ final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
+ final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
+ final ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success");
+
+ getClientUtil().startProcessor(generate);
+ waitForQueueCount(connection.getId(), 2);
+
+ // Shut down node 2
+ disconnectNode(2);
+
+ getClientUtil().stopProcessor(generate);
+ getClientUtil().startProcessor(terminate);
+
+ waitForQueueCount(connection.getId(), 0);
+
+ reconnectNode(2);
+ waitForAllNodesConnected();
+
+ getClientUtil().waitForStoppedProcessor(generate.getId());
+ waitForQueueCount(connection.getId(), 0);
+
+ switchClientToNode(2);
+
+ // Ensure that Node 2 has the correct state for each processor.
+ waitFor(() -> {
+ final ProcessorEntity latestTerminate = getNifiClient().getProcessorClient(DO_NOT_REPLICATE).getProcessor(terminate.getId());
+ return "RUNNING".equalsIgnoreCase(latestTerminate.getComponent().getState());
+ });
+
+ waitFor(() -> {
+ final ProcessorEntity latestGenerate = getNifiClient().getProcessorClient(DO_NOT_REPLICATE).getProcessor(generate.getId());
+ return "STOPPED".equalsIgnoreCase(latestGenerate.getComponent().getState());
+ });
+ }
+
private boolean isNodeDisconnectedDueToMissingConnection(final int nodeApiPort, final String connectionId) throws NiFiClientException, IOException {
final NodeDTO node2Dto = getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()
.filter(dto -> dto.getApiPort() == nodeApiPort)