You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/02/25 19:29:59 UTC

[nifi] branch main updated: NIFI-9729: When restarting components in the VersionedFlowSynchronizer, first filter out any components that are intended to be stopped.

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 6cea5ea  NIFI-9729: When restarting components in the VersionedFlowSynchronizer, first filter out any components that are intended to be stopped.
6cea5ea is described below

commit 6cea5ea520f48c5d05a30df93fa582cf42d3438e
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Feb 25 11:32:27 2022 -0500

    NIFI-9729: When restarting components in the VersionedFlowSynchronizer, first filter out any components that are intended to be stopped.
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #5806.
---
 .../nifi/groups/ProcessGroupSynchronizer.java      |   6 ++
 .../groups/StandardProcessGroupSynchronizer.java   |  55 +++++++---
 .../serialization/AffectedComponentSet.java        |  10 ++
 .../serialization/ComponentSetFilter.java          |  81 ++++++++++++++
 .../serialization/RunningComponentSetFilter.java   | 117 +++++++++++++++++++++
 .../serialization/VersionedFlowSynchronizer.java   |   7 +-
 .../system/clustering/FlowSynchronizationIT.java   |  37 +++++++
 7 files changed, 298 insertions(+), 15 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java
index 4f7a309..688ea1c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java
@@ -23,6 +23,12 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
 
 public interface ProcessGroupSynchronizer {
 
+    /**
+     * Synchronize the given Process Group to match the proposed snaphsot
+     * @param group the Process Group to update
+     * @param proposedSnapshot the proposed/desired state for the process group
+     * @param synchronizationOptions options for how to synchronize the group
+     */
     void synchronize(ProcessGroup group, VersionedFlowSnapshot proposedSnapshot, GroupSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException;
 
     void verifyCanSynchronize(ProcessGroup group, VersionedProcessGroup proposed, boolean verifyConnectionRemoval);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
index 39004a5..7ec6231 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
@@ -108,6 +108,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -1696,33 +1697,59 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
 
         if (proposed.getInputPorts() != null) {
             for (final VersionedRemoteGroupPort port : proposed.getInputPorts()) {
-                if (port.getScheduledState() != org.apache.nifi.flow.ScheduledState.RUNNING) {
-                    continue;
-                }
-
-                final String portId = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), rpg.getIdentifier());
-                final RemoteGroupPort remoteGroupPort = rpg.getInputPort(portId);
+                final RemoteGroupPort remoteGroupPort = getRpgInputPort(port, rpg, componentIdGenerator);
                 if (remoteGroupPort != null) {
-                    context.getComponentScheduler().startComponent(remoteGroupPort);
+                    synchronizeTransmissionState(port, remoteGroupPort);
                 }
             }
         }
 
         if (proposed.getOutputPorts() != null) {
             for (final VersionedRemoteGroupPort port : proposed.getOutputPorts()) {
-                if (port.getScheduledState() != org.apache.nifi.flow.ScheduledState.RUNNING) {
-                    continue;
-                }
-
-                final String portId = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), rpg.getIdentifier());
-                final RemoteGroupPort remoteGroupPort = rpg.getOutputPort(portId);
+                final RemoteGroupPort remoteGroupPort = getRpgOutputPort(port, rpg, componentIdGenerator);
                 if (remoteGroupPort != null) {
-                    context.getComponentScheduler().startComponent(remoteGroupPort);
+                    synchronizeTransmissionState(port, remoteGroupPort);
                 }
             }
         }
     }
 
+    private RemoteGroupPort getRpgInputPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator) {
+        return getRpgPort(port, rpg, componentIdGenerator, RemoteProcessGroup::getInputPort);
+    }
+
+    private RemoteGroupPort getRpgOutputPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator) {
+        return getRpgPort(port, rpg, componentIdGenerator, RemoteProcessGroup::getOutputPort);
+    }
+
+    private RemoteGroupPort getRpgPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator,
+                                       final BiFunction<RemoteProcessGroup, String, RemoteGroupPort> portLookup) {
+        final String instanceId = port.getInstanceIdentifier();
+        if (instanceId != null) {
+            final RemoteGroupPort remoteGroupPort = portLookup.apply(rpg, instanceId);
+            if (remoteGroupPort != null) {
+                return remoteGroupPort;
+            }
+        }
+
+        final String portId = componentIdGenerator.generateUuid(port.getIdentifier(), port.getInstanceIdentifier(), rpg.getIdentifier());
+        final RemoteGroupPort remoteGroupPort = portLookup.apply(rpg, portId);
+        return remoteGroupPort;
+    }
+
+    private void synchronizeTransmissionState(final VersionedRemoteGroupPort versionedPort, final RemoteGroupPort remoteGroupPort) {
+        final ScheduledState portState = remoteGroupPort.getScheduledState();
+
+        if (versionedPort.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) {
+            if (portState != ScheduledState.RUNNING) {
+                context.getComponentScheduler().startComponent(remoteGroupPort);
+            }
+        } else {
+            if (portState == ScheduledState.RUNNING) {
+                remoteGroupPort.getRemoteProcessGroup().stopTransmitting(remoteGroupPort);
+            }
+        }
+    }
 
     private RemoteProcessGroupPortDescriptor createPortDescriptor(final VersionedRemoteGroupPort proposed, final ComponentIdGenerator componentIdGenerator, final String rpgId) {
         final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
index b62881e..d0b7970 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
@@ -482,6 +482,16 @@ public class AffectedComponentSet {
         reportingTasks.forEach(flowController::startReportingTask);
     }
 
+    public void removeComponents(final ComponentSetFilter filter) {
+        inputPorts.removeIf(filter::testInputPort);
+        outputPorts.removeIf(filter::testOutputPort);
+        remoteInputPorts.removeIf(filter::testRemoteInputPort);
+        remoteOutputPorts.removeIf(filter::testRemoteOutputPort);
+        processors.removeIf(filter::testProcessor);
+        controllerServices.removeIf(filter::testControllerService);
+        reportingTasks.removeIf(filter::testReportingTask);
+    }
+
     /**
      * Returns a new AffectedComponentSet that represents only those components that currently exist within the NiFi instance. When a set of dataflow updates have occurred, it is very possible
      * that one or more components referred to by the AffectedComponentSet no longer exist (for example, there was a dataflow update that removed a Processor, so that Processor no longer exists).
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java
new file mode 100644
index 0000000..6edc155
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.serialization;
+
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.remote.RemoteGroupPort;
+
+public interface ComponentSetFilter {
+    boolean testProcessor(ProcessorNode processor);
+
+    boolean testReportingTask(ReportingTaskNode reportingTask);
+
+    boolean testControllerService(ControllerServiceNode controllerService);
+
+    boolean testInputPort(Port port);
+
+    boolean testOutputPort(Port port);
+
+    boolean testRemoteInputPort(RemoteGroupPort port);
+
+    boolean testRemoteOutputPort(RemoteGroupPort port);
+
+    default ComponentSetFilter reverse() {
+        final ComponentSetFilter original = this;
+
+        return new ComponentSetFilter() {
+            @Override
+            public boolean testProcessor(final ProcessorNode processor) {
+                return !original.testProcessor(processor);
+            }
+
+            @Override
+            public boolean testReportingTask(final ReportingTaskNode reportingTask) {
+                return !original.testReportingTask(reportingTask);
+            }
+
+            @Override
+            public boolean testControllerService(final ControllerServiceNode controllerService) {
+                return !original.testControllerService(controllerService);
+            }
+
+            @Override
+            public boolean testInputPort(final Port port) {
+                return !original.testInputPort(port);
+            }
+
+            @Override
+            public boolean testOutputPort(final Port port) {
+                return !original.testOutputPort(port);
+            }
+
+            @Override
+            public boolean testRemoteInputPort(final RemoteGroupPort port) {
+                return !original.testRemoteInputPort(port);
+            }
+
+            @Override
+            public boolean testRemoteOutputPort(final RemoteGroupPort port) {
+                return !original.testRemoteOutputPort(port);
+            }
+        };
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/RunningComponentSetFilter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/RunningComponentSetFilter.java
new file mode 100644
index 0000000..c141144
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/RunningComponentSetFilter.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.serialization;
+
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.remote.RemoteGroupPort;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RunningComponentSetFilter implements ComponentSetFilter {
+    private final Map<String, VersionedControllerService> controllerServices = new HashMap<>();
+    private final Map<String, VersionedProcessor> processors = new HashMap<>();
+    private final Map<String, VersionedReportingTask> reportingTasks = new HashMap<>();
+    private final Map<String, VersionedPort> inputPorts = new HashMap<>();
+    private final Map<String, VersionedPort> outputPorts = new HashMap<>();
+    private final Map<String, VersionedRemoteGroupPort> remoteInputPorts = new HashMap<>();
+    private final Map<String, VersionedRemoteGroupPort> remoteOutputPorts = new HashMap<>();
+
+    public RunningComponentSetFilter(final VersionedDataflow dataflow) {
+        dataflow.getControllerServices().forEach(service -> controllerServices.put(service.getInstanceIdentifier(), service));
+        dataflow.getReportingTasks().forEach(task -> reportingTasks.put(task.getInstanceIdentifier(), task));
+        flatten(dataflow.getRootGroup());
+    }
+
+    private void flatten(final VersionedProcessGroup group) {
+        group.getInputPorts().forEach(port -> inputPorts.put(port.getInstanceIdentifier(), port));
+        group.getOutputPorts().forEach(port -> outputPorts.put(port.getInstanceIdentifier(), port));
+        group.getControllerServices().forEach(service -> controllerServices.put(service.getInstanceIdentifier(), service));
+        group.getProcessors().forEach(processor -> processors.put(processor.getInstanceIdentifier(), processor));
+
+        for (final VersionedRemoteProcessGroup rpg : group.getRemoteProcessGroups()) {
+            rpg.getInputPorts().forEach(port -> {
+                if (port.getInstanceIdentifier() != null) {
+                    remoteInputPorts.put(port.getInstanceIdentifier(), port);
+                }
+            });
+
+            rpg.getOutputPorts().forEach(port -> {
+                if (port.getInstanceIdentifier() != null) {
+                    remoteOutputPorts.put(port.getInstanceIdentifier(), port);
+                }
+            });
+        }
+
+        group.getProcessGroups().forEach(this::flatten);
+    }
+
+    @Override
+    public boolean testProcessor(final ProcessorNode processor) {
+        final VersionedProcessor versionedProcessor = processors.get(processor.getIdentifier());
+        return versionedProcessor != null && versionedProcessor.getScheduledState() == ScheduledState.RUNNING;
+    }
+
+    @Override
+    public boolean testReportingTask(final ReportingTaskNode reportingTask) {
+        final VersionedReportingTask versionedReportingTask = reportingTasks.get(reportingTask.getIdentifier());
+        return versionedReportingTask != null && versionedReportingTask.getScheduledState() == ScheduledState.RUNNING;
+    }
+
+    @Override
+    public boolean testControllerService(final ControllerServiceNode controllerService) {
+        final VersionedControllerService versionedService = controllerServices.get(controllerService.getIdentifier());
+        return versionedService != null && versionedService.getScheduledState() == ScheduledState.ENABLED;
+    }
+
+    @Override
+    public boolean testInputPort(final Port port) {
+        final VersionedPort versionedPort = inputPorts.get(port.getIdentifier());
+        return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING;
+    }
+
+    @Override
+    public boolean testOutputPort(final Port port) {
+        final VersionedPort versionedPort = outputPorts.get(port.getIdentifier());
+        return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING;
+    }
+
+    @Override
+    public boolean testRemoteInputPort(final RemoteGroupPort port) {
+        final VersionedRemoteGroupPort versionedPort = remoteInputPorts.get(port.getIdentifier());
+        return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING;
+    }
+
+    @Override
+    public boolean testRemoteOutputPort(final RemoteGroupPort port) {
+        final VersionedRemoteGroupPort versionedPort = remoteOutputPorts.get(port.getIdentifier());
+        return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index cd2fef0..2262103 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -186,7 +186,12 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
         } finally {
             // We have to call toExistingSet() here because some of the components that existed in the active set may no longer exist,
             // so attempting to start them will fail.
-            activeSet.toExistingSet().toStartableSet().start();
+            final AffectedComponentSet startable = activeSet.toExistingSet().toStartableSet();
+
+            final ComponentSetFilter runningComponentFilter = new RunningComponentSetFilter(proposedFlow.getVersionedDataflow());
+            final ComponentSetFilter stoppedComponentFilter = runningComponentFilter.reverse();
+            startable.removeComponents(stoppedComponentFilter);
+            startable.start();
         }
 
         final long millis = System.currentTimeMillis() - start;
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index b7dcad1..bba6061 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -437,6 +437,43 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         waitFor(() -> isNodeDisconnectedDueToMissingConnection(5672, connection.getId()));
     }
 
+    @Test
+    public void testComponentStatesRestoredOnReconnect() throws NiFiClientException, IOException, InterruptedException {
+        final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
+        final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
+        final ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success");
+
+        getClientUtil().startProcessor(generate);
+        waitForQueueCount(connection.getId(), 2);
+
+        // Shut down node 2
+        disconnectNode(2);
+
+        getClientUtil().stopProcessor(generate);
+        getClientUtil().startProcessor(terminate);
+
+        waitForQueueCount(connection.getId(), 0);
+
+        reconnectNode(2);
+        waitForAllNodesConnected();
+
+        getClientUtil().waitForStoppedProcessor(generate.getId());
+        waitForQueueCount(connection.getId(), 0);
+
+        switchClientToNode(2);
+
+        // Ensure that Node 2 has the correct state for each processor.
+        waitFor(() -> {
+            final ProcessorEntity latestTerminate = getNifiClient().getProcessorClient(DO_NOT_REPLICATE).getProcessor(terminate.getId());
+            return "RUNNING".equalsIgnoreCase(latestTerminate.getComponent().getState());
+        });
+
+        waitFor(() -> {
+            final ProcessorEntity latestGenerate = getNifiClient().getProcessorClient(DO_NOT_REPLICATE).getProcessor(generate.getId());
+            return "STOPPED".equalsIgnoreCase(latestGenerate.getComponent().getState());
+        });
+    }
+
     private boolean isNodeDisconnectedDueToMissingConnection(final int nodeApiPort, final String connectionId) throws NiFiClientException, IOException {
         final NodeDTO node2Dto = getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()
             .filter(dto -> dto.getApiPort() == nodeApiPort)