You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/10/09 14:21:05 UTC

[nifi] branch master updated: NIFI-6748: Fixed bug in Parameter Contexts' affected components where if a Controller Service referenced a Parameter, any component that references that service should also be considered an affected component but wasn't. Also fixed a bug in how we handled stopping a Processor that was in the STARTING phase.

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 47f45ab  NIFI-6748: Fixed bug in Parameter Contexts' affected components where if a Controller Service referenced a Parameter, any component that references that service should also be considered an affected component but wasn't. Also fixed a bug in how we handled stopping a Processor that was in the STARTING phase.
47f45ab is described below

commit 47f45abdf9d99ac4b46dbed56c6dac3e0404d8be
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Oct 8 14:19:40 2019 -0400

    NIFI-6748: Fixed bug in Parameter Contexts' affected components where if a Controller Service referenced a Parameter, any component that references that service should also be considered an affected component but wasn't. Also fixed a bug in how we handled stopping a Processor that was in the STARTING phase.
    
    This closes #3794.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../web/api/entity/AffectedComponentEntity.java    |  18 +++
 .../nifi/controller/StandardProcessorNode.java     |  11 +-
 .../apache/nifi/groups/StandardProcessGroup.java   |   4 +-
 .../apache/nifi/web/StandardNiFiServiceFacade.java | 121 +++++++++++++--------
 4 files changed, 105 insertions(+), 49 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java
index 7ef93a7..8ba95c3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java
@@ -21,6 +21,7 @@ import org.apache.nifi.web.api.dto.AffectedComponentDTO;
 import org.apache.nifi.web.api.dto.ProcessGroupNameDTO;
 
 import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Objects;
 
 /**
  * A serialized representation of this class can be placed in the entity body of a response to the API.
@@ -71,4 +72,21 @@ public class AffectedComponentEntity extends ComponentEntity implements Permissi
     public String toString() {
         return component == null ? "AffectedComponent[No Component]" : component.toString();
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(getId());
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (!(obj instanceof AffectedComponentEntity)) {
+            return false;
+        }
+
+        return Objects.equals(getId(), ((AffectedComponentEntity) obj).getId());
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 92805e7..96402a4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1361,6 +1361,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
 
         final Processor processor = processorRef.get().getProcessor();
         final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
+        LOG.info("Starting {}", this);
 
         ScheduledState currentState;
         boolean starting;
@@ -1498,7 +1499,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
             final ProcessContext processContext = processContextFactory.get();
 
             final ScheduledState currentScheduleState = scheduledState.get();
-            if (currentScheduleState == ScheduledState.STOPPING || currentScheduleState == ScheduledState.STOPPED) {
+            if (currentScheduleState == ScheduledState.STOPPING || currentScheduleState == ScheduledState.STOPPED || getDesiredState() == ScheduledState.STOPPED) {
                 LOG.debug("{} is stopped. Will not call @OnScheduled lifecycle methods or begin trigger onTrigger() method", StandardProcessorNode.this);
                 schedulingAgentCallback.onTaskComplete();
                 scheduledState.set(ScheduledState.STOPPED);
@@ -1648,7 +1649,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
             final SchedulingAgent schedulingAgent, final LifecycleState scheduleState) {
 
         final Processor processor = processorRef.get().getProcessor();
-        LOG.info("Stopping processor: " + processor.getClass());
+        LOG.info("Stopping processor: " + this);
         desiredState = ScheduledState.STOPPED;
 
         final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1722,7 +1723,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
             // before stop() was called. If that happens the stop processor
             // routine will be initiated in start() method, otherwise the IF
             // part will handle the stop processor routine.
-            this.scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.STOPPING);
+            final boolean updated = this.scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.STOPPING);
+            if (updated) {
+                LOG.debug("Transitioned state of {} from STARTING to STOPPING", this);
+            }
+
             future.complete(null);
         }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index b52d68c..f04286a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -1397,8 +1397,6 @@ public final class StandardProcessGroup implements ProcessGroup {
             final ScheduledState state = processor.getScheduledState();
             if (state == ScheduledState.DISABLED) {
                 throw new IllegalStateException("Processor is disabled");
-            } else if (state == ScheduledState.STOPPED) {
-                return CompletableFuture.completedFuture(null);
             }
 
             return scheduler.stopProcessor(processor);
@@ -2954,7 +2952,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 }
 
                 if (service.getState() != ControllerServiceState.DISABLED) {
-                    throw new IllegalStateException("Cannot change Parameter Context for " + this + " because " + service + " is referencing at least one Parameter is is not disabled");
+                    throw new IllegalStateException("Cannot change Parameter Context for " + this + " because " + service + " is referencing at least one Parameter and is not disabled");
                 }
 
                 verifyParameterSensitivityIsValid(service, parameterContext);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 9c96cc5..6532f51 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,35 +16,7 @@
  */
 package org.apache.nifi.web;
 
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-
+import com.google.common.collect.Sets;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.Component;
@@ -338,7 +310,33 @@ import org.apache.nifi.web.util.SnippetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Implementation of NiFiServiceFacade that performs revision checking.
@@ -1282,22 +1280,21 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
             for (final ControllerServiceNode service : group.getControllerServices(false)) {
                 if (includeInactive || service.isActive()) {
                     final Set<String> referencedParams = service.getReferencedParameterNames();
-                    final boolean referencesUpdatedParam = referencedParams.stream().anyMatch(updatedParameterNames::contains);
-
-                    if (referencesUpdatedParam) {
-                        affectedComponents.add(service);
-
-                        final AffectedComponentEntity affectedComponentEntity = dtoFactory.createAffectedComponentEntity(service, revisionManager);
-
-                        for (final String referencedParam : referencedParams) {
-                            for (final ParameterEntity paramEntity : parameterContextDto.getParameters()) {
-                                final ParameterDTO paramDto = paramEntity.getParameter();
-                                if (referencedParam.equals(paramDto.getName())) {
-                                    paramDto.getReferencingComponents().add(affectedComponentEntity);
-                                }
+                    final Set<String> updatedReferencedParams = referencedParams.stream().filter(updatedParameterNames::contains).collect(Collectors.toSet());
+
+                    final List<ParameterDTO> affectedParameterDtos = new ArrayList<>();
+                    for (final String referencedParam : referencedParams) {
+                        for (final ParameterEntity paramEntity : parameterContextDto.getParameters()) {
+                            final ParameterDTO paramDto = paramEntity.getParameter();
+                            if (referencedParam.equals(paramDto.getName())) {
+                                affectedParameterDtos.add(paramDto);
                             }
                         }
                     }
+
+                    if (!updatedReferencedParams.isEmpty()) {
+                        addReferencingComponents(service, affectedComponents, affectedParameterDtos, includeInactive);
+                    }
                 }
             }
         }
@@ -1305,6 +1302,44 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         return dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager);
     }
 
+    private void addReferencingComponents(final ControllerServiceNode service, final Set<ComponentNode> affectedComponents, final List<ParameterDTO> affectedParameterDtos,
+                                          final boolean includeInactive) {
+
+        // We keep a mapping of Affected Components for the Parameter Context Update as well as a set of all Affected Components for each updated Parameter.
+        // We must update both of these.
+        affectedComponents.add(service);
+
+        // Update Parameter DTO to also reflect the Affected Component.
+        final AffectedComponentEntity affectedComponentEntity = dtoFactory.createAffectedComponentEntity(service, revisionManager);
+        affectedParameterDtos.forEach(dto -> dto.getReferencingComponents().add(affectedComponentEntity));
+
+        for (final ComponentNode referencingComponent : service.getReferences().getReferencingComponents()) {
+            if (includeInactive || isActive(referencingComponent)) {
+                // We must update both the Set of Affected Components as well as the Affected Components for the referenced parameter.
+                affectedComponents.add(referencingComponent);
+
+                final AffectedComponentEntity referencingComponentEntity = dtoFactory.createAffectedComponentEntity(referencingComponent, revisionManager);
+                affectedParameterDtos.forEach(dto -> dto.getReferencingComponents().add(referencingComponentEntity));
+
+                if (referencingComponent instanceof ControllerServiceNode) {
+                    addReferencingComponents((ControllerServiceNode) referencingComponent, affectedComponents, affectedParameterDtos, includeInactive);
+                }
+            }
+        }
+    }
+
+    private boolean isActive(final ComponentNode componentNode) {
+        if (componentNode instanceof ControllerServiceNode) {
+            return ((ControllerServiceNode) componentNode).isActive();
+        }
+
+        if (componentNode instanceof ProcessorNode) {
+            return ((ProcessorNode) componentNode).isRunning();
+        }
+
+        return false;
+    }
+
     private Set<String> getUpdatedParameterNames(final ParameterContextDTO parameterContextDto) {
         final ParameterContext parameterContext = parameterContextDAO.getParameterContext(parameterContextDto.getId());