You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/11/21 22:15:10 UTC
nifi git commit: NIFI-2996 - validate processors only when they are
in STOPPED state - report validation errors via REST API on
processors/services/tasks/ports only when they are in the STOPPED state -
This closes #1192
Repository: nifi
Updated Branches:
refs/heads/master fb9cbccc3 -> 15af764dd
NIFI-2996
- validate processors only when they are in STOPPED state
- report validation errors via REST API on processors/services/tasks/ports only when they are in the STOPPED state
- This closes #1192
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/15af764d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/15af764d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/15af764d
Branch: refs/heads/master
Commit: 15af764dd8ad8c4a6789c556635e817a2a553828
Parents: fb9cbcc
Author: Mike Moser <mo...@apache.org>
Authored: Wed Nov 9 12:38:04 2016 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Mon Nov 21 17:02:25 2016 -0500
----------------------------------------------------------------------
.../apache/nifi/controller/FlowController.java | 12 +--
.../nifi/controller/StandardProcessorNode.java | 80 ++++++++++----------
.../reporting/AbstractReportingTaskNode.java | 10 +++
.../service/StandardControllerServiceNode.java | 9 +++
.../controller/TestStandardProcessorNode.java | 16 ++++
.../nifi/remote/StandardRemoteGroupPort.java | 41 +++++-----
.../nifi/remote/StandardRootGroupPort.java | 17 +++--
7 files changed, 115 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 48b9c14..adab649 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2722,16 +2722,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
status.setFlowFilesRemoved(entry.getFlowFilesRemoved());
}
- // determine the run status and get any validation errors... must check
- // is valid when not disabled since a processors validity could change due
- // to environmental conditions (property configured with a file path and
- // the file being externally removed)
+ // Determine the run status and get any validation error... only validating while STOPPED
+ // is a trade-off we are willing to make, even though processor validity could change due to
+ // environmental conditions (property configured with a file path and the file being externally
+ // removed). This saves on validation costs that would be unnecessary most of the time.
if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
status.setRunStatus(RunStatus.Disabled);
- } else if (!procNode.isValid()) {
- status.setRunStatus(RunStatus.Invalid);
} else if (ScheduledState.RUNNING.equals(procNode.getScheduledState())) {
status.setRunStatus(RunStatus.Running);
+ } else if (!procNode.isValid()) {
+ status.setRunStatus(RunStatus.Invalid);
} else {
status.setRunStatus(RunStatus.Stopped);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index f94cc8b..3d2c222 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -987,50 +987,54 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
public Collection<ValidationResult> getValidationErrors() {
final List<ValidationResult> results = new ArrayList<>();
try {
- final ValidationContext validationContext = this.getValidationContextFactory()
- .newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier());
-
- final Collection<ValidationResult> validationResults;
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
- validationResults = getProcessor().validate(validationContext);
- }
-
- for (final ValidationResult result : validationResults) {
- if (!result.isValid()) {
- results.add(result);
+ // Processors may go invalid while RUNNING, but only validating while STOPPED is a trade-off
+ // we are willing to make in order to save on validation costs that would be unnecessary most of the time.
+ if (getScheduledState() == ScheduledState.STOPPED) {
+ final ValidationContext validationContext = this.getValidationContextFactory()
+ .newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier());
+
+ final Collection<ValidationResult> validationResults;
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
+ validationResults = getProcessor().validate(validationContext);
}
- }
- for (final Relationship relationship : getUndefinedRelationships()) {
- if (!isAutoTerminated(relationship)) {
- final ValidationResult error = new ValidationResult.Builder()
- .explanation("Relationship '" + relationship.getName()
- + "' is not connected to any component and is not auto-terminated")
- .subject("Relationship " + relationship.getName()).valid(false).build();
- results.add(error);
+ for (final ValidationResult result : validationResults) {
+ if (!result.isValid()) {
+ results.add(result);
+ }
}
- }
- switch (getInputRequirement()) {
- case INPUT_ALLOWED:
- break;
- case INPUT_FORBIDDEN: {
- final int incomingConnCount = getIncomingNonLoopConnections().size();
- if (incomingConnCount != 0) {
- results.add(new ValidationResult.Builder().explanation(
- "Processor does not allow upstream connections but currently has " + incomingConnCount)
- .subject("Upstream Connections").valid(false).build());
+ for (final Relationship relationship : getUndefinedRelationships()) {
+ if (!isAutoTerminated(relationship)) {
+ final ValidationResult error = new ValidationResult.Builder()
+ .explanation("Relationship '" + relationship.getName()
+ + "' is not connected to any component and is not auto-terminated")
+ .subject("Relationship " + relationship.getName()).valid(false).build();
+ results.add(error);
+ }
}
- break;
- }
- case INPUT_REQUIRED: {
- if (getIncomingNonLoopConnections().isEmpty()) {
- results.add(new ValidationResult.Builder()
- .explanation("Processor requires an upstream connection but currently has none")
- .subject("Upstream Connections").valid(false).build());
+
+ switch (getInputRequirement()) {
+ case INPUT_ALLOWED:
+ break;
+ case INPUT_FORBIDDEN: {
+ final int incomingConnCount = getIncomingNonLoopConnections().size();
+ if (incomingConnCount != 0) {
+ results.add(new ValidationResult.Builder().explanation(
+ "Processor does not allow upstream connections but currently has " + incomingConnCount)
+ .subject("Upstream Connections").valid(false).build());
+ }
+ break;
+ }
+ case INPUT_REQUIRED: {
+ if (getIncomingNonLoopConnections().isEmpty()) {
+ results.add(new ValidationResult.Builder()
+ .explanation("Processor requires an upstream connection but currently has none")
+ .subject("Upstream Connections").valid(false).build());
+ }
+ break;
+ }
}
- break;
- }
}
} catch (final Throwable t) {
results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString())
http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index ac5525c..deca385 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -34,6 +34,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -241,4 +242,13 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
public String getProcessGroupIdentifier() {
return null;
}
+
+ @Override
+ public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) {
+ Collection<ValidationResult> results = null;
+ if (getScheduledState() == ScheduledState.STOPPED) {
+ results = super.getValidationErrors(serviceIdentifiersNotToValidate);
+ }
+ return results != null ? results : Collections.emptySet();
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index d4a16b3..2c5b096 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -438,4 +438,13 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
final ProcessGroup procGroup = getProcessGroup();
return procGroup == null ? null : procGroup.getIdentifier();
}
+
+ @Override
+ public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) {
+ Collection<ValidationResult> results = null;
+ if (stateRef.get() == ControllerServiceState.DISABLED) {
+ results = super.getValidationErrors(serviceIdentifiersNotToValidate);
+ }
+ return results != null ? results : Collections.emptySet();
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
index ce89ba5..e9623e3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
@@ -118,6 +118,22 @@ public class TestStandardProcessorNode {
}
@Test
+ public void testDisabledValidationErrors() {
+ final ModifiesClasspathNoAnnotationProcessor processor = new ModifiesClasspathNoAnnotationProcessor();
+ final StandardProcessorNode procNode = createProcessorNode(processor);
+
+ // Set a property to an invalid value
+ final Map<String, String> properties = new HashMap<>();
+ properties.put(ModifiesClasspathNoAnnotationProcessor.CLASSPATH_RESOURCE.getName(), "");
+ procNode.setProperties(properties);
+ Assert.assertTrue(procNode.getValidationErrors().size() > 0);
+
+ // Disabled processors skip property validation
+ procNode.disable();
+ Assert.assertFalse(procNode.getValidationErrors().size() > 0);
+ }
+
+ @Test
public void testSinglePropertyDynamicallyModifiesClasspath() throws MalformedURLException {
final PropertyDescriptor classpathProp = new PropertyDescriptor.Builder().name("Classpath Resources")
.dynamicallyModifiesClasspath(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index e8b542e..48d60d6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -34,6 +34,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
@@ -367,7 +368,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final String dataSize = FormatUtils.formatDataSize(bytesReceived);
- logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
+ logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
}
@@ -381,31 +382,33 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
@Override
public boolean isValid() {
- return getValidationErrors().isEmpty();
+ return targetExists.get()
+ && (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT ? !getConnections(Relationship.ANONYMOUS).isEmpty() : true);
}
@Override
public Collection<ValidationResult> getValidationErrors() {
final Collection<ValidationResult> validationErrors = new ArrayList<>();
- ValidationResult error = null;
- if (!targetExists.get()) {
- error = new ValidationResult.Builder()
- .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
- .subject(String.format("Remote port '%s'", getName()))
- .valid(false)
- .build();
- } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
- error = new ValidationResult.Builder()
- .explanation(String.format("Port '%s' has no outbound connections", getName()))
- .subject(String.format("Remote port '%s'", getName()))
- .valid(false)
- .build();
- }
+ if (getScheduledState() == ScheduledState.STOPPED) {
+ ValidationResult error = null;
+ if (!targetExists.get()) {
+ error = new ValidationResult.Builder()
+ .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
+ .subject(String.format("Remote port '%s'", getName()))
+ .valid(false)
+ .build();
+ } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
+ error = new ValidationResult.Builder()
+ .explanation(String.format("Port '%s' has no outbound connections", getName()))
+ .subject(String.format("Remote port '%s'", getName()))
+ .valid(false)
+ .build();
+ }
- if (error != null) {
- validationErrors.add(error);
+ if (error != null) {
+ validationErrors.add(error);
+ }
}
-
return validationErrors;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 4c78dd5..5cb26ea 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -30,6 +30,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup;
@@ -273,13 +274,15 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
@Override
public Collection<ValidationResult> getValidationErrors() {
final Collection<ValidationResult> validationErrors = new ArrayList<>();
- if (!isValid()) {
- final ValidationResult error = new ValidationResult.Builder()
- .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
- .subject(String.format("Port '%s'", getName()))
- .valid(false)
- .build();
- validationErrors.add(error);
+ if (getScheduledState() == ScheduledState.STOPPED) {
+ if (!isValid()) {
+ final ValidationResult error = new ValidationResult.Builder()
+ .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
+ .subject(String.format("Port '%s'", getName()))
+ .valid(false)
+ .build();
+ validationErrors.add(error);
+ }
}
return validationErrors;
}