You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/05/20 19:32:35 UTC
[nifi] 03/06: NIFI-10001: When enabling a collection of Controller Services, change… (#6042)
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 0b7d11776b786a6e73468d48252f8dc47cef6662
Author: markap14 <ma...@hotmail.com>
AuthorDate: Fri May 13 15:09:23 2022 -0400
NIFI-10001: When enabling a collection of Controller Services, change… (#6042)
* NIFI-10001: When enabling a collection of Controller Services, changed logic. Instead of enabling dependent services and waiting for them to complete enablement before starting a given service, just start the services given. The previous logic was necessary long ago because we couldn't enable a service unless all dependent services were fully enabled. But that changed a while ago. Now, we can enable a service when it's invalid. It'll just keep trying to enable until it becomes valid [...]
* NIFI-10001: Restored previous implementation for StandardControllerServiceProvider, as the changes were not ultimately what we needed. Changed StandardProcessGroup to use a ConcurrentHashMap for controller services instead of a HashMap with readLock. This was causing a deadlock when we enable a Controller Service that references another service during flow synchronization. Flow Synchronization was happening within a write lock and enabling the service required a read lock on the gro [...]
---
.../service/StandardControllerServiceProvider.java | 8 ++---
.../apache/nifi/groups/StandardProcessGroup.java | 19 +++---------
.../groups/StandardProcessGroupSynchronizer.java | 21 +++++++++----
.../apache/nifi/web/StandardNiFiServiceFacade.java | 2 ++
.../system/clustering/FlowSynchronizationIT.java | 34 ++++++++++++++++++++++
5 files changed, 59 insertions(+), 25 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index a8e032a73a..592d096c8b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -449,11 +449,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
}
- final Set<ControllerServiceNode> servicesForGroup = groupOfInterest.getControllerServices(true);
- for (final ControllerServiceNode serviceNode : servicesForGroup) {
- if (serviceIdentifier.equals(serviceNode.getIdentifier())) {
- return serviceNode.getProxiedControllerService();
- }
+ final ControllerServiceNode serviceNode = groupOfInterest.findControllerService(serviceIdentifier, false, true);
+ if (serviceNode != null) {
+ return serviceNode.getProxiedControllerService();
}
return null;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 1924f9bcfb..33fb77c620 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -133,6 +133,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -184,7 +185,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>();
private final Map<String, ProcessorNode> processors = new HashMap<>();
private final Map<String, Funnel> funnels = new HashMap<>();
- private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>();
+ private final Map<String, ControllerServiceNode> controllerServices = new ConcurrentHashMap<>();
private final Map<String, Template> templates = new HashMap<>();
private final PropertyEncryptor encryptor;
private final MutableVariableRegistry variableRegistry;
@@ -2311,24 +2312,12 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public ControllerServiceNode getControllerService(final String id) {
- readLock.lock();
- try {
- return controllerServices.get(requireNonNull(id));
- } finally {
- readLock.unlock();
- }
+ return controllerServices.get(requireNonNull(id));
}
@Override
public Set<ControllerServiceNode> getControllerServices(final boolean recursive) {
- final Set<ControllerServiceNode> services = new HashSet<>();
-
- readLock.lock();
- try {
- services.addAll(controllerServices.values());
- } finally {
- readLock.unlock();
- }
+ final Set<ControllerServiceNode> services = new HashSet<>(controllerServices.values());
if (recursive) {
final ProcessGroup parentGroup = parent.get();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
index 8c5592dc18..fc3eb383d0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
@@ -165,6 +165,12 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
if (FlowDifferenceFilters.isScheduledStateNew(diff)) {
continue;
}
+ // If the difference type is a Scheduled State Change, we want to ignore it, because we are just trying to
+ // find components that need to be stopped in order to be updated. We don't need to stop a component in order
+ // to change its Scheduled State.
+ if (diff.getDifferenceType() == DifferenceType.SCHEDULED_STATE_CHANGED) {
+ continue;
+ }
// If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
// and if so compare our VersionedControllerService to the existing service.
@@ -196,12 +202,17 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
if (LOG.isInfoEnabled()) {
- final String differencesByLine = flowComparison.getDifferences().stream()
- .map(FlowDifference::toString)
- .collect(Collectors.joining("\n"));
+ final Set<FlowDifference> differences = flowComparison.getDifferences();
+ if (differences.isEmpty()) {
+ LOG.info("No differences between current flow and proposed flow for {}", group);
+ } else {
+ final String differencesByLine = differences.stream()
+ .map(FlowDifference::toString)
+ .collect(Collectors.joining("\n"));
- LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow,
- flowComparison.getDifferences().size(), differencesByLine);
+ LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow,
+ differences.size(), differencesByLine);
+ }
}
final Set<String> knownVariables = getKnownVariableNames(group);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 4cafa8429f..d3409fdde0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -2890,6 +2890,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() {
final Set<ComponentNode> updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
+ controllerFacade.save();
+
final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences();
// get the revisions of the updated components
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index 7a5d89e1a2..faf1409b9f 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -873,6 +873,40 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
});
}
+ @Test
+ public void testRejoinAfterControllerServiceEnabled() throws NiFiClientException, IOException, InterruptedException {
+ final ControllerServiceEntity controllerService = getClientUtil().createControllerService("StandardCountService");
+ disconnectNode(2);
+
+ getClientUtil().enableControllerService(controllerService);
+ reconnectNode(2);
+ waitForAllNodesConnected();
+
+ switchClientToNode(2);
+ waitFor(() -> {
+ final ControllerServiceEntity currentService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(controllerService.getId());
+ return ControllerServiceState.ENABLED.name().equals(currentService.getComponent().getState());
+ });
+ }
+
+ @Test
+ public void testRejoinAfterControllerServiceDisabled() throws NiFiClientException, IOException, InterruptedException {
+ final ControllerServiceEntity controllerService = getClientUtil().createControllerService("StandardCountService");
+ getClientUtil().enableControllerService(controllerService);
+
+ disconnectNode(2);
+ getClientUtil().disableControllerService(controllerService);
+
+ reconnectNode(2);
+ waitForAllNodesConnected();
+
+ switchClientToNode(2);
+ waitFor(() -> {
+ final ControllerServiceEntity currentService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(controllerService.getId());
+ return ControllerServiceState.DISABLED.name().equals(currentService.getComponent().getState());
+ });
+ }
+
private VersionedDataflow getNode2Flow() throws IOException {
final File instanceDir = getNiFiInstance().getNodeInstance(2).getInstanceDirectory();