You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2022/05/13 19:09:33 UTC
[nifi] branch main updated: NIFI-10001: When enabling a collection of Controller Services, change… (#6042)
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8031b62351 NIFI-10001: When enabling a collection of Controller Services, change… (#6042)
8031b62351 is described below
commit 8031b6235145963c6c8c30d136a9bb9061d625a1
Author: markap14 <ma...@hotmail.com>
AuthorDate: Fri May 13 15:09:23 2022 -0400
NIFI-10001: When enabling a collection of Controller Services, change… (#6042)
* NIFI-10001: When enabling a collection of Controller Services, changed logic. Instead of enabling dependent services and waiting for them to complete enablement before starting a given service, just start the services given. The previous logic was necessary long ago because we couldn't enable a service unless all dependent services were fully enabled. But that changed a while ago. Now, we can enable a service when it's invalid. It'll just keep trying to enable until it becomes valid [...]
* NIFI-10001: Restored previous implementation for StandardControllerServiceProvider, as the changes were not ultimately what we needed. Changed StandardProcessGroup to use a ConcurrentHashMap for controller services instead of a HashMap with readLock. This was causing a deadlock when we enable a Controller Service that references another service during flow synchronization. Flow Synchronization was happening within a write lock and enabling the service required a read lock on the gro [...]
---
.../service/StandardControllerServiceProvider.java | 8 ++---
.../StandardVersionedComponentSynchronizer.java | 21 +++++++++----
.../apache/nifi/groups/StandardProcessGroup.java | 19 +++---------
.../apache/nifi/web/StandardNiFiServiceFacade.java | 2 ++
.../system/clustering/FlowSynchronizationIT.java | 34 ++++++++++++++++++++++
5 files changed, 59 insertions(+), 25 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 3e7cfdf5aa..a42fbbb569 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -470,11 +470,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
}
- final Set<ControllerServiceNode> servicesForGroup = groupOfInterest.getControllerServices(true);
- for (final ControllerServiceNode serviceNode : servicesForGroup) {
- if (serviceIdentifier.equals(serviceNode.getIdentifier())) {
- return serviceNode.getProxiedControllerService();
- }
+ final ControllerServiceNode serviceNode = groupOfInterest.findControllerService(serviceIdentifier, false, true);
+ if (serviceNode != null) {
+ return serviceNode.getProxiedControllerService();
}
return null;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index d498d5fe38..98ab1b3dd0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -183,6 +183,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
if (FlowDifferenceFilters.isScheduledStateNew(diff)) {
continue;
}
+ // If the difference type is a Scheduled State Change, we want to ignore it, because we are just trying to
+ // find components that need to be stopped in order to be updated. We don't need to stop a component in order
+ // to change its Scheduled State.
+ if (diff.getDifferenceType() == DifferenceType.SCHEDULED_STATE_CHANGED) {
+ continue;
+ }
// If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
// and if so compare our VersionedControllerService to the existing service.
@@ -214,12 +220,17 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
if (LOG.isInfoEnabled()) {
- final String differencesByLine = flowComparison.getDifferences().stream()
- .map(FlowDifference::toString)
- .collect(Collectors.joining("\n"));
+ final Set<FlowDifference> differences = flowComparison.getDifferences();
+ if (differences.isEmpty()) {
+ LOG.info("No differences between current flow and proposed flow for {}", group);
+ } else {
+ final String differencesByLine = differences.stream()
+ .map(FlowDifference::toString)
+ .collect(Collectors.joining("\n"));
- LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow,
- flowComparison.getDifferences().size(), differencesByLine);
+ LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow,
+ differences.size(), differencesByLine);
+ }
}
final Set<String> knownVariables = getKnownVariableNames(group);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 8960cefad4..77830d323b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -135,6 +135,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -186,7 +187,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>();
private final Map<String, ProcessorNode> processors = new HashMap<>();
private final Map<String, Funnel> funnels = new HashMap<>();
- private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>();
+ private final Map<String, ControllerServiceNode> controllerServices = new ConcurrentHashMap<>();
private final Map<String, Template> templates = new HashMap<>();
private final PropertyEncryptor encryptor;
private final MutableVariableRegistry variableRegistry;
@@ -2313,24 +2314,12 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public ControllerServiceNode getControllerService(final String id) {
- readLock.lock();
- try {
- return controllerServices.get(requireNonNull(id));
- } finally {
- readLock.unlock();
- }
+ return controllerServices.get(requireNonNull(id));
}
@Override
public Set<ControllerServiceNode> getControllerServices(final boolean recursive) {
- final Set<ControllerServiceNode> services = new HashSet<>();
-
- readLock.lock();
- try {
- services.addAll(controllerServices.values());
- } finally {
- readLock.unlock();
- }
+ final Set<ControllerServiceNode> services = new HashSet<>(controllerServices.values());
if (recursive) {
final ProcessGroup parentGroup = parent.get();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 4cafa8429f..d3409fdde0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -2890,6 +2890,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() {
final Set<ComponentNode> updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
+ controllerFacade.save();
+
final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences();
// get the revisions of the updated components
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index 7a5d89e1a2..faf1409b9f 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -873,6 +873,40 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
});
}
+ @Test
+ public void testRejoinAfterControllerServiceEnabled() throws NiFiClientException, IOException, InterruptedException {
+ final ControllerServiceEntity controllerService = getClientUtil().createControllerService("StandardCountService");
+ disconnectNode(2);
+
+ getClientUtil().enableControllerService(controllerService);
+ reconnectNode(2);
+ waitForAllNodesConnected();
+
+ switchClientToNode(2);
+ waitFor(() -> {
+ final ControllerServiceEntity currentService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(controllerService.getId());
+ return ControllerServiceState.ENABLED.name().equals(currentService.getComponent().getState());
+ });
+ }
+
+ @Test
+ public void testRejoinAfterControllerServiceDisabled() throws NiFiClientException, IOException, InterruptedException {
+ final ControllerServiceEntity controllerService = getClientUtil().createControllerService("StandardCountService");
+ getClientUtil().enableControllerService(controllerService);
+
+ disconnectNode(2);
+ getClientUtil().disableControllerService(controllerService);
+
+ reconnectNode(2);
+ waitForAllNodesConnected();
+
+ switchClientToNode(2);
+ waitFor(() -> {
+ final ControllerServiceEntity currentService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(controllerService.getId());
+ return ControllerServiceState.DISABLED.name().equals(currentService.getComponent().getState());
+ });
+ }
+
private VersionedDataflow getNode2Flow() throws IOException {
final File instanceDir = getNiFiInstance().getNodeInstance(2).getInstanceDirectory();