You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/10/09 14:21:05 UTC
[nifi] branch master updated: NIFI-6748: Fixed bug in Parameter
Contexts' affected components where if a Controller Service referenced a
Parameter,
any component that references that service should also be considered an
affected component but wasn't. Also fixed a bug in how we handled stopping
a Processor that was in the STARTING phase.
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 47f45ab NIFI-6748: Fixed bug in Parameter Contexts' affected components where if a Controller Service referenced a Parameter, any component that references that service should also be considered an affected component but wasn't. Also fixed a bug in how we handled stopping a Processor that was in the STARTING phase.
47f45ab is described below
commit 47f45abdf9d99ac4b46dbed56c6dac3e0404d8be
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Oct 8 14:19:40 2019 -0400
NIFI-6748: Fixed bug in Parameter Contexts' affected components where if a Controller Service referenced a Parameter, any component that references that service should also be considered an affected component but wasn't. Also fixed a bug in how we handled stopping a Processor that was in the STARTING phase.
This closes #3794.
Signed-off-by: Bryan Bende <bb...@apache.org>
---
.../web/api/entity/AffectedComponentEntity.java | 18 +++
.../nifi/controller/StandardProcessorNode.java | 11 +-
.../apache/nifi/groups/StandardProcessGroup.java | 4 +-
.../apache/nifi/web/StandardNiFiServiceFacade.java | 121 +++++++++++++--------
4 files changed, 105 insertions(+), 49 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java
index 7ef93a7..8ba95c3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java
@@ -21,6 +21,7 @@ import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.ProcessGroupNameDTO;
import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Objects;
/**
* A serialized representation of this class can be placed in the entity body of a response to the API.
@@ -71,4 +72,21 @@ public class AffectedComponentEntity extends ComponentEntity implements Permissi
public String toString() {
return component == null ? "AffectedComponent[No Component]" : component.toString();
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(getId());
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof AffectedComponentEntity)) {
+ return false;
+ }
+
+ return Objects.equals(getId(), ((AffectedComponentEntity) obj).getId());
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 92805e7..96402a4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1361,6 +1361,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final Processor processor = processorRef.get().getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
+ LOG.info("Starting {}", this);
ScheduledState currentState;
boolean starting;
@@ -1498,7 +1499,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final ProcessContext processContext = processContextFactory.get();
final ScheduledState currentScheduleState = scheduledState.get();
- if (currentScheduleState == ScheduledState.STOPPING || currentScheduleState == ScheduledState.STOPPED) {
+ if (currentScheduleState == ScheduledState.STOPPING || currentScheduleState == ScheduledState.STOPPED || getDesiredState() == ScheduledState.STOPPED) {
LOG.debug("{} is stopped. Will not call @OnScheduled lifecycle methods or begin trigger onTrigger() method", StandardProcessorNode.this);
schedulingAgentCallback.onTaskComplete();
scheduledState.set(ScheduledState.STOPPED);
@@ -1648,7 +1649,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final SchedulingAgent schedulingAgent, final LifecycleState scheduleState) {
final Processor processor = processorRef.get().getProcessor();
- LOG.info("Stopping processor: " + processor.getClass());
+ LOG.info("Stopping processor: " + this);
desiredState = ScheduledState.STOPPED;
final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1722,7 +1723,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// before stop() was called. If that happens the stop processor
// routine will be initiated in start() method, otherwise the IF
// part will handle the stop processor routine.
- this.scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.STOPPING);
+ final boolean updated = this.scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.STOPPING);
+ if (updated) {
+ LOG.debug("Transitioned state of {} from STARTING to STOPPING", this);
+ }
+
future.complete(null);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index b52d68c..f04286a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -1397,8 +1397,6 @@ public final class StandardProcessGroup implements ProcessGroup {
final ScheduledState state = processor.getScheduledState();
if (state == ScheduledState.DISABLED) {
throw new IllegalStateException("Processor is disabled");
- } else if (state == ScheduledState.STOPPED) {
- return CompletableFuture.completedFuture(null);
}
return scheduler.stopProcessor(processor);
@@ -2954,7 +2952,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
if (service.getState() != ControllerServiceState.DISABLED) {
- throw new IllegalStateException("Cannot change Parameter Context for " + this + " because " + service + " is referencing at least one Parameter is is not disabled");
+ throw new IllegalStateException("Cannot change Parameter Context for " + this + " because " + service + " is referencing at least one Parameter and is not disabled");
}
verifyParameterSensitivityIsValid(service, parameterContext);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 9c96cc5..6532f51 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,35 +16,7 @@
*/
package org.apache.nifi.web;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-
+import com.google.common.collect.Sets;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
@@ -338,7 +310,33 @@ import org.apache.nifi.web.util.SnippetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
@@ -1282,22 +1280,21 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
for (final ControllerServiceNode service : group.getControllerServices(false)) {
if (includeInactive || service.isActive()) {
final Set<String> referencedParams = service.getReferencedParameterNames();
- final boolean referencesUpdatedParam = referencedParams.stream().anyMatch(updatedParameterNames::contains);
-
- if (referencesUpdatedParam) {
- affectedComponents.add(service);
-
- final AffectedComponentEntity affectedComponentEntity = dtoFactory.createAffectedComponentEntity(service, revisionManager);
-
- for (final String referencedParam : referencedParams) {
- for (final ParameterEntity paramEntity : parameterContextDto.getParameters()) {
- final ParameterDTO paramDto = paramEntity.getParameter();
- if (referencedParam.equals(paramDto.getName())) {
- paramDto.getReferencingComponents().add(affectedComponentEntity);
- }
+ final Set<String> updatedReferencedParams = referencedParams.stream().filter(updatedParameterNames::contains).collect(Collectors.toSet());
+
+ final List<ParameterDTO> affectedParameterDtos = new ArrayList<>();
+ for (final String referencedParam : referencedParams) {
+ for (final ParameterEntity paramEntity : parameterContextDto.getParameters()) {
+ final ParameterDTO paramDto = paramEntity.getParameter();
+ if (referencedParam.equals(paramDto.getName())) {
+ affectedParameterDtos.add(paramDto);
}
}
}
+
+ if (!updatedReferencedParams.isEmpty()) {
+ addReferencingComponents(service, affectedComponents, affectedParameterDtos, includeInactive);
+ }
}
}
}
@@ -1305,6 +1302,44 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager);
}
+ private void addReferencingComponents(final ControllerServiceNode service, final Set<ComponentNode> affectedComponents, final List<ParameterDTO> affectedParameterDtos,
+ final boolean includeInactive) {
+
+ // We keep a mapping of Affected Components for the Parameter Context Update as well as a set of all Affected Components for each updated Parameter.
+ // We must update both of these.
+ affectedComponents.add(service);
+
+ // Update Parameter DTO to also reflect the Affected Component.
+ final AffectedComponentEntity affectedComponentEntity = dtoFactory.createAffectedComponentEntity(service, revisionManager);
+ affectedParameterDtos.forEach(dto -> dto.getReferencingComponents().add(affectedComponentEntity));
+
+ for (final ComponentNode referencingComponent : service.getReferences().getReferencingComponents()) {
+ if (includeInactive || isActive(referencingComponent)) {
+ // We must update both the Set of Affected Components as well as the Affected Components for the referenced parameter.
+ affectedComponents.add(referencingComponent);
+
+ final AffectedComponentEntity referencingComponentEntity = dtoFactory.createAffectedComponentEntity(referencingComponent, revisionManager);
+ affectedParameterDtos.forEach(dto -> dto.getReferencingComponents().add(referencingComponentEntity));
+
+ if (referencingComponent instanceof ControllerServiceNode) {
+ addReferencingComponents((ControllerServiceNode) referencingComponent, affectedComponents, affectedParameterDtos, includeInactive);
+ }
+ }
+ }
+ }
+
+ private boolean isActive(final ComponentNode componentNode) {
+ if (componentNode instanceof ControllerServiceNode) {
+ return ((ControllerServiceNode) componentNode).isActive();
+ }
+
+ if (componentNode instanceof ProcessorNode) {
+ return ((ProcessorNode) componentNode).isRunning();
+ }
+
+ return false;
+ }
+
private Set<String> getUpdatedParameterNames(final ParameterContextDTO parameterContextDto) {
final ParameterContext parameterContext = parameterContextDAO.getParameterContext(parameterContextDto.getId());