You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2022/05/13 19:09:33 UTC

[nifi] branch main updated: NIFI-10001: When enabling a collection of Controller Services, change… (#6042)

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 8031b62351 NIFI-10001: When enabling a collection of Controller Services, change… (#6042)
8031b62351 is described below

commit 8031b6235145963c6c8c30d136a9bb9061d625a1
Author: markap14 <ma...@hotmail.com>
AuthorDate: Fri May 13 15:09:23 2022 -0400

    NIFI-10001: When enabling a collection of Controller Services, change… (#6042)
    
    * NIFI-10001: When enabling a collection of Controller Services, changed logic. Instead of enabling dependent services and waiting for them to complete enablement before starting a given service, just start the services given. The previous logic was necessary long ago because we couldn't enable a service unless all dependent services were fully enabled. But that changed a while ago. Now, we can enable a service when it's invalid. It'll just keep trying to enable until it becomes valid [...]
    
    * NIFI-10001: Restored previous implementation for StandardControllerServiceProvider, as the changes were not ultimately what we needed. Changed StandardProcessGroup to use a ConcurrentHashMap for controller services instead of a HashMap with readLock. This was causing a deadlock when we enable a Controller Service that references another service during flow synchronization. Flow Synchronization was happening within a write lock and enabling the service required a read lock on the gro [...]
---
 .../service/StandardControllerServiceProvider.java |  8 ++---
 .../StandardVersionedComponentSynchronizer.java    | 21 +++++++++----
 .../apache/nifi/groups/StandardProcessGroup.java   | 19 +++---------
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  2 ++
 .../system/clustering/FlowSynchronizationIT.java   | 34 ++++++++++++++++++++++
 5 files changed, 59 insertions(+), 25 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 3e7cfdf5aa..a42fbbb569 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -470,11 +470,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
         }
 
-        final Set<ControllerServiceNode> servicesForGroup = groupOfInterest.getControllerServices(true);
-        for (final ControllerServiceNode serviceNode : servicesForGroup) {
-            if (serviceIdentifier.equals(serviceNode.getIdentifier())) {
-                return serviceNode.getProxiedControllerService();
-            }
+        final ControllerServiceNode serviceNode = groupOfInterest.findControllerService(serviceIdentifier, false, true);
+        if (serviceNode != null) {
+            return serviceNode.getProxiedControllerService();
         }
 
         return null;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index d498d5fe38..98ab1b3dd0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -183,6 +183,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
             if (FlowDifferenceFilters.isScheduledStateNew(diff)) {
                 continue;
             }
+            // If the difference type is a Scheduled State Change, we want to ignore it, because we are just trying to
+            // find components that need to be stopped in order to be updated. We don't need to stop a component in order
+            // to change its Scheduled State.
+            if (diff.getDifferenceType() == DifferenceType.SCHEDULED_STATE_CHANGED) {
+                continue;
+            }
 
             // If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
             // and if so compare our VersionedControllerService to the existing service.
@@ -214,12 +220,17 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
         }
 
         if (LOG.isInfoEnabled()) {
-            final String differencesByLine = flowComparison.getDifferences().stream()
-                .map(FlowDifference::toString)
-                .collect(Collectors.joining("\n"));
+            final Set<FlowDifference> differences = flowComparison.getDifferences();
+            if (differences.isEmpty()) {
+                LOG.info("No differences between current flow and proposed flow for {}", group);
+            } else {
+                final String differencesByLine = differences.stream()
+                    .map(FlowDifference::toString)
+                    .collect(Collectors.joining("\n"));
 
-            LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow,
-                flowComparison.getDifferences().size(), differencesByLine);
+                LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow,
+                    differences.size(), differencesByLine);
+            }
         }
 
         final Set<String> knownVariables = getKnownVariableNames(group);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 8960cefad4..77830d323b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -135,6 +135,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -186,7 +187,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>();
     private final Map<String, ProcessorNode> processors = new HashMap<>();
     private final Map<String, Funnel> funnels = new HashMap<>();
-    private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>();
+    private final Map<String, ControllerServiceNode> controllerServices = new ConcurrentHashMap<>();
     private final Map<String, Template> templates = new HashMap<>();
     private final PropertyEncryptor encryptor;
     private final MutableVariableRegistry variableRegistry;
@@ -2313,24 +2314,12 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     @Override
     public ControllerServiceNode getControllerService(final String id) {
-        readLock.lock();
-        try {
-            return controllerServices.get(requireNonNull(id));
-        } finally {
-            readLock.unlock();
-        }
+        return controllerServices.get(requireNonNull(id));
     }
 
     @Override
     public Set<ControllerServiceNode> getControllerServices(final boolean recursive) {
-        final Set<ControllerServiceNode> services = new HashSet<>();
-
-        readLock.lock();
-        try {
-            services.addAll(controllerServices.values());
-        } finally {
-            readLock.unlock();
-        }
+        final Set<ControllerServiceNode> services = new HashSet<>(controllerServices.values());
 
         if (recursive) {
             final ProcessGroup parentGroup = parent.get();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 4cafa8429f..d3409fdde0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -2890,6 +2890,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
                     @Override
                     public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() {
                         final Set<ComponentNode> updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
+                        controllerFacade.save();
+
                         final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences();
 
                         // get the revisions of the updated components
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index 7a5d89e1a2..faf1409b9f 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -873,6 +873,40 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         });
     }
 
+    @Test
+    public void testRejoinAfterControllerServiceEnabled() throws NiFiClientException, IOException, InterruptedException {
+        final ControllerServiceEntity controllerService = getClientUtil().createControllerService("StandardCountService");
+        disconnectNode(2);
+
+        getClientUtil().enableControllerService(controllerService);
+        reconnectNode(2);
+        waitForAllNodesConnected();
+
+        switchClientToNode(2);
+        waitFor(() -> {
+            final ControllerServiceEntity currentService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(controllerService.getId());
+            return ControllerServiceState.ENABLED.name().equals(currentService.getComponent().getState());
+        });
+    }
+
+    @Test
+    public void testRejoinAfterControllerServiceDisabled() throws NiFiClientException, IOException, InterruptedException {
+        final ControllerServiceEntity controllerService = getClientUtil().createControllerService("StandardCountService");
+        getClientUtil().enableControllerService(controllerService);
+
+        disconnectNode(2);
+        getClientUtil().disableControllerService(controllerService);
+
+        reconnectNode(2);
+        waitForAllNodesConnected();
+
+        switchClientToNode(2);
+        waitFor(() -> {
+            final ControllerServiceEntity currentService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(controllerService.getId());
+            return ControllerServiceState.DISABLED.name().equals(currentService.getComponent().getState());
+        });
+    }
+
 
     private VersionedDataflow getNode2Flow() throws IOException {
         final File instanceDir = getNiFiInstance().getNodeInstance(2).getInstanceDirectory();