You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/05/20 19:32:35 UTC

[nifi] 03/06: NIFI-10001: When enabling a collection of Controller Services, change… (#6042)

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 0b7d11776b786a6e73468d48252f8dc47cef6662
Author: markap14 <ma...@hotmail.com>
AuthorDate: Fri May 13 15:09:23 2022 -0400

    NIFI-10001: When enabling a collection of Controller Services, change… (#6042)
    
    * NIFI-10001: When enabling a collection of Controller Services, changed logic. Instead of enabling dependent services and waiting for them to complete enablement before starting a given service, just start the services given. The previous logic was necessary long ago because we couldn't enable a service unless all dependent services were fully enabled. But that changed a while ago. Now, we can enable a service when it's invalid. It'll just keep trying to enable until it becomes valid [...]
    
    * NIFI-10001: Restored previous implementation for StandardControllerServiceProvider, as the changes were not ultimately what we needed. Changed StandardProcessGroup to use a ConcurrentHashMap for controller services instead of a HashMap with readLock. This was causing a deadlock when we enable a Controller Service that references another service during flow synchronization. Flow Synchronization was happening within a write lock and enabling the service required a read lock on the gro [...]
---
 .../service/StandardControllerServiceProvider.java |  8 ++---
 .../apache/nifi/groups/StandardProcessGroup.java   | 19 +++---------
 .../groups/StandardProcessGroupSynchronizer.java   | 21 +++++++++----
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  2 ++
 .../system/clustering/FlowSynchronizationIT.java   | 34 ++++++++++++++++++++++
 5 files changed, 59 insertions(+), 25 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index a8e032a73a..592d096c8b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -449,11 +449,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
         }
 
-        final Set<ControllerServiceNode> servicesForGroup = groupOfInterest.getControllerServices(true);
-        for (final ControllerServiceNode serviceNode : servicesForGroup) {
-            if (serviceIdentifier.equals(serviceNode.getIdentifier())) {
-                return serviceNode.getProxiedControllerService();
-            }
+        final ControllerServiceNode serviceNode = groupOfInterest.findControllerService(serviceIdentifier, false, true);
+        if (serviceNode != null) {
+            return serviceNode.getProxiedControllerService();
         }
 
         return null;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 1924f9bcfb..33fb77c620 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -133,6 +133,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -184,7 +185,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>();
     private final Map<String, ProcessorNode> processors = new HashMap<>();
     private final Map<String, Funnel> funnels = new HashMap<>();
-    private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>();
+    private final Map<String, ControllerServiceNode> controllerServices = new ConcurrentHashMap<>();
     private final Map<String, Template> templates = new HashMap<>();
     private final PropertyEncryptor encryptor;
     private final MutableVariableRegistry variableRegistry;
@@ -2311,24 +2312,12 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     @Override
     public ControllerServiceNode getControllerService(final String id) {
-        readLock.lock();
-        try {
-            return controllerServices.get(requireNonNull(id));
-        } finally {
-            readLock.unlock();
-        }
+        return controllerServices.get(requireNonNull(id));
     }
 
     @Override
     public Set<ControllerServiceNode> getControllerServices(final boolean recursive) {
-        final Set<ControllerServiceNode> services = new HashSet<>();
-
-        readLock.lock();
-        try {
-            services.addAll(controllerServices.values());
-        } finally {
-            readLock.unlock();
-        }
+        final Set<ControllerServiceNode> services = new HashSet<>(controllerServices.values());
 
         if (recursive) {
             final ProcessGroup parentGroup = parent.get();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
index 8c5592dc18..fc3eb383d0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
@@ -165,6 +165,12 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
             if (FlowDifferenceFilters.isScheduledStateNew(diff)) {
                 continue;
             }
+            // If the difference type is a Scheduled State Change, we want to ignore it, because we are just trying to
+            // find components that need to be stopped in order to be updated. We don't need to stop a component in order
+            // to change its Scheduled State.
+            if (diff.getDifferenceType() == DifferenceType.SCHEDULED_STATE_CHANGED) {
+                continue;
+            }
 
             // If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
             // and if so compare our VersionedControllerService to the existing service.
@@ -196,12 +202,17 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
         }
 
         if (LOG.isInfoEnabled()) {
-            final String differencesByLine = flowComparison.getDifferences().stream()
-                .map(FlowDifference::toString)
-                .collect(Collectors.joining("\n"));
+            final Set<FlowDifference> differences = flowComparison.getDifferences();
+            if (differences.isEmpty()) {
+                LOG.info("No differences between current flow and proposed flow for {}", group);
+            } else {
+                final String differencesByLine = differences.stream()
+                    .map(FlowDifference::toString)
+                    .collect(Collectors.joining("\n"));
 
-            LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow,
-                flowComparison.getDifferences().size(), differencesByLine);
+                LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow,
+                    differences.size(), differencesByLine);
+            }
         }
 
         final Set<String> knownVariables = getKnownVariableNames(group);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 4cafa8429f..d3409fdde0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -2890,6 +2890,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
                     @Override
                     public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() {
                         final Set<ComponentNode> updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
+                        controllerFacade.save();
+
                         final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences();
 
                         // get the revisions of the updated components
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index 7a5d89e1a2..faf1409b9f 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -873,6 +873,40 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         });
     }
 
+    @Test
+    public void testRejoinAfterControllerServiceEnabled() throws NiFiClientException, IOException, InterruptedException {
+        final ControllerServiceEntity controllerService = getClientUtil().createControllerService("StandardCountService");
+        disconnectNode(2);
+
+        getClientUtil().enableControllerService(controllerService);
+        reconnectNode(2);
+        waitForAllNodesConnected();
+
+        switchClientToNode(2);
+        waitFor(() -> {
+            final ControllerServiceEntity currentService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(controllerService.getId());
+            return ControllerServiceState.ENABLED.name().equals(currentService.getComponent().getState());
+        });
+    }
+
+    @Test
+    public void testRejoinAfterControllerServiceDisabled() throws NiFiClientException, IOException, InterruptedException {
+        final ControllerServiceEntity controllerService = getClientUtil().createControllerService("StandardCountService");
+        getClientUtil().enableControllerService(controllerService);
+
+        disconnectNode(2);
+        getClientUtil().disableControllerService(controllerService);
+
+        reconnectNode(2);
+        waitForAllNodesConnected();
+
+        switchClientToNode(2);
+        waitFor(() -> {
+            final ControllerServiceEntity currentService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(controllerService.getId());
+            return ControllerServiceState.DISABLED.name().equals(currentService.getComponent().getState());
+        });
+    }
+
 
     private VersionedDataflow getNode2Flow() throws IOException {
         final File instanceDir = getNiFiInstance().getNodeInstance(2).getInstanceDirectory();