You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2017/04/27 19:40:07 UTC
nifi git commit: NIFI-3725 - validate processors only when they are
in STOPPED state - report validation errors via REST API on
processors/services/tasks/ports only when they are in the STOPPED state
Repository: nifi
Updated Branches:
refs/heads/0.x b69b3b6c6 -> 8a18d2666
NIFI-3725 - validate processors only when they are in STOPPED state - report validation errors via REST API on processors/services/tasks/ports only when they are in the STOPPED state
Signed-off-by: Joe Skora <js...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8a18d266
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8a18d266
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8a18d266
Branch: refs/heads/0.x
Commit: 8a18d266666c28bcb0e2df6b30359976638637d9
Parents: b69b3b6
Author: Mike Moser <mo...@apache.org>
Authored: Wed Apr 26 18:00:32 2017 +0000
Committer: Joe Skora <js...@apache.org>
Committed: Thu Apr 27 15:39:25 2017 -0400
----------------------------------------------------------------------
.../apache/nifi/controller/FlowController.java | 12 ++--
.../nifi/controller/StandardProcessorNode.java | 72 +++++++++++---------
.../reporting/AbstractReportingTaskNode.java | 10 +++
.../service/StandardControllerServiceNode.java | 9 +++
.../nifi/remote/StandardRemoteGroupPort.java | 49 +++++++------
.../nifi/remote/StandardRootGroupPort.java | 17 +++--
6 files changed, 103 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 4559706..611e471 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2551,16 +2551,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
status.setBytesSent(entry.getBytesSent());
}
- // determine the run status and get any validation errors... must check
- // is valid when not disabled since a processors validity could change due
- // to environmental conditions (property configured with a file path and
- // the file being externally removed)
+ // Determine the run status and get any validation errors... only validating while STOPPED
+ // is a trade-off we are willing to make, even though processor validity could change due to
+ // environmental conditions (property configured with a file path and the file being externally
+ // removed). This saves on validation costs that would be unnecessary most of the time.
if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
status.setRunStatus(RunStatus.Disabled);
- } else if (!procNode.isValid()) {
- status.setRunStatus(RunStatus.Invalid);
} else if (ScheduledState.RUNNING.equals(procNode.getScheduledState())) {
status.setRunStatus(RunStatus.Running);
+ } else if (!procNode.isValid()) {
+ status.setRunStatus(RunStatus.Invalid);
} else {
status.setRunStatus(RunStatus.Stopped);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 5b45d2a..bb93fa2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -946,50 +946,54 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final List<ValidationResult> results = new ArrayList<>();
try {
- final ValidationContext validationContext = this.getValidationContextFactory()
- .newValidationContext(getProperties(), getAnnotationData());
-
- final Collection<ValidationResult> validationResults;
- try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- validationResults = getProcessor().validate(validationContext);
- }
+ // Processors may go invalid while RUNNING, but only validating while STOPPED is a trade-off
+ // we are willing to make in order to save on validation costs that would be unnecessary most of the time.
+ if (getScheduledState() == ScheduledState.STOPPED) {
+ final ValidationContext validationContext = this.getValidationContextFactory()
+ .newValidationContext(getProperties(), getAnnotationData());
+
+ final Collection<ValidationResult> validationResults;
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ validationResults = getProcessor().validate(validationContext);
+ }
- for (final ValidationResult result : validationResults) {
- if (!result.isValid()) {
- results.add(result);
+ for (final ValidationResult result : validationResults) {
+ if (!result.isValid()) {
+ results.add(result);
+ }
}
- }
- for (final Relationship relationship : getUndefinedRelationships()) {
- if (!isAutoTerminated(relationship)) {
- final ValidationResult error = new ValidationResult.Builder()
+ for (final Relationship relationship : getUndefinedRelationships()) {
+ if (!isAutoTerminated(relationship)) {
+ final ValidationResult error = new ValidationResult.Builder()
.explanation("Relationship '" + relationship.getName()
+ "' is not connected to any component and is not auto-terminated")
.subject("Relationship " + relationship.getName()).valid(false).build();
- results.add(error);
+ results.add(error);
+ }
}
- }
- switch (getInputRequirement()) {
- case INPUT_ALLOWED:
- break;
- case INPUT_FORBIDDEN: {
- final int incomingConnCount = getIncomingNonLoopConnections().size();
- if (incomingConnCount != 0) {
- results.add(new ValidationResult.Builder().explanation(
- "Processor does not allow upstream connections but currently has " + incomingConnCount)
- .subject("Upstream Connections").valid(false).build());
+ switch (getInputRequirement()) {
+ case INPUT_ALLOWED:
+ break;
+ case INPUT_FORBIDDEN: {
+ final int incomingConnCount = getIncomingNonLoopConnections().size();
+ if (incomingConnCount != 0) {
+ results.add(new ValidationResult.Builder().explanation(
+ "Processor does not allow upstream connections but currently has " + incomingConnCount)
+ .subject("Upstream Connections").valid(false).build());
+ }
+ break;
+ }
+ case INPUT_REQUIRED: {
+ if (getIncomingNonLoopConnections().isEmpty()) {
+ results.add(new ValidationResult.Builder()
+ .explanation("Processor requires an upstream connection but currently has none")
+ .subject("Upstream Connections").valid(false).build());
+ }
+ break;
}
- break;
- }
- case INPUT_REQUIRED: {
- if (getIncomingNonLoopConnections().isEmpty()) {
- results.add(new ValidationResult.Builder()
- .explanation("Processor requires an upstream connection but currently has none")
- .subject("Upstream Connections").valid(false).build());
}
- break;
- }
}
} catch (final Throwable t) {
results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString())
http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 31c2242..21a6553 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller.reporting;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -259,4 +260,13 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
public String toString() {
return "ReportingTask[id=" + getIdentifier() + ", name=" + getName() + "]";
}
+
+ @Override
+ public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) {
+ Collection<ValidationResult> results = null;
+ if (getScheduledState() == ScheduledState.STOPPED) {
+ results = super.getValidationErrors(serviceIdentifiersNotToValidate);
+ }
+ return results != null ? results : Collections.<ValidationResult>emptySet();
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 4aa9ab6..e8e3810 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -395,4 +395,13 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
}
}
+
+ @Override
+ public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) {
+ Collection<ValidationResult> results = null;
+ if (stateRef.get() == ControllerServiceState.DISABLED) {
+ results = super.getValidationErrors(serviceIdentifiersNotToValidate);
+ }
+ return results != null ? results : Collections.<ValidationResult>emptySet();
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 9f6f783..dd44bc8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -33,6 +33,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
@@ -350,7 +351,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final String dataSize = FormatUtils.formatDataSize(bytesReceived);
- logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
+ logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
}
@@ -364,31 +365,41 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
@Override
public boolean isValid() {
- return getValidationErrors().isEmpty();
+ if (!targetExists.get()) {
+ return false;
+ }
+
+ if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
+ // if it's an output port, ensure that there is an outbound connection
+ return false;
+ }
+
+ return true;
}
@Override
public Collection<ValidationResult> getValidationErrors() {
final Collection<ValidationResult> validationErrors = new ArrayList<>();
- ValidationResult error = null;
- if (!targetExists.get()) {
- error = new ValidationResult.Builder()
- .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
- .subject(String.format("Remote port '%s'", getName()))
- .valid(false)
- .build();
- } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
- error = new ValidationResult.Builder()
- .explanation(String.format("Port '%s' has no outbound connections", getName()))
- .subject(String.format("Remote port '%s'", getName()))
- .valid(false)
- .build();
- }
+ if (getScheduledState() == ScheduledState.STOPPED) {
+ ValidationResult error = null;
+ if (!targetExists.get()) {
+ error = new ValidationResult.Builder()
+ .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
+ .subject(String.format("Remote port '%s'", getName()))
+ .valid(false)
+ .build();
+ } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
+ error = new ValidationResult.Builder()
+ .explanation(String.format("Port '%s' has no outbound connections", getName()))
+ .subject(String.format("Remote port '%s'", getName()))
+ .valid(false)
+ .build();
+ }
- if (error != null) {
- validationErrors.add(error);
+ if (error != null) {
+ validationErrors.add(error);
+ }
}
-
return validationErrors;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 66fd303..71932f7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -44,6 +44,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup;
@@ -262,13 +263,15 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
@Override
public Collection<ValidationResult> getValidationErrors() {
final Collection<ValidationResult> validationErrors = new ArrayList<>();
- if (!isValid()) {
- final ValidationResult error = new ValidationResult.Builder()
- .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
- .subject(String.format("Port '%s'", getName()))
- .valid(false)
- .build();
- validationErrors.add(error);
+ if (getScheduledState() == ScheduledState.STOPPED) {
+ if (!isValid()) {
+ final ValidationResult error = new ValidationResult.Builder()
+ .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
+ .subject(String.format("Port '%s'", getName()))
+ .valid(false)
+ .build();
+ validationErrors.add(error);
+ }
}
return validationErrors;
}