You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2017/04/27 19:40:07 UTC

nifi git commit: NIFI-3725 - validate processors only when they are in STOPPED state - report validation errors via REST API on processors/services/tasks/ports only when they are in the STOPPED state

Repository: nifi
Updated Branches:
  refs/heads/0.x b69b3b6c6 -> 8a18d2666


NIFI-3725 - validate processors only when they are in STOPPED state - report validation errors via REST API on processors/services/tasks/ports only when they are in the STOPPED state

Signed-off-by: Joe Skora <js...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8a18d266
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8a18d266
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8a18d266

Branch: refs/heads/0.x
Commit: 8a18d266666c28bcb0e2df6b30359976638637d9
Parents: b69b3b6
Author: Mike Moser <mo...@apache.org>
Authored: Wed Apr 26 18:00:32 2017 +0000
Committer: Joe Skora <js...@apache.org>
Committed: Thu Apr 27 15:39:25 2017 -0400

----------------------------------------------------------------------
 .../apache/nifi/controller/FlowController.java  | 12 ++--
 .../nifi/controller/StandardProcessorNode.java  | 72 +++++++++++---------
 .../reporting/AbstractReportingTaskNode.java    | 10 +++
 .../service/StandardControllerServiceNode.java  |  9 +++
 .../nifi/remote/StandardRemoteGroupPort.java    | 49 +++++++------
 .../nifi/remote/StandardRootGroupPort.java      | 17 +++--
 6 files changed, 103 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 4559706..611e471 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2551,16 +2551,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             status.setBytesSent(entry.getBytesSent());
         }
 
-        // determine the run status and get any validation errors... must check
-        // is valid when not disabled since a processors validity could change due
-        // to environmental conditions (property configured with a file path and
-        // the file being externally removed)
+        // Determine the run status and get any validation errors... only validating while STOPPED
+        // is a trade-off we are willing to make, even though processor validity could change due to
+        // environmental conditions (property configured with a file path and the file being externally
+        // removed). This saves on validation costs that would be unnecessary most of the time.
         if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
             status.setRunStatus(RunStatus.Disabled);
-        } else if (!procNode.isValid()) {
-            status.setRunStatus(RunStatus.Invalid);
         } else if (ScheduledState.RUNNING.equals(procNode.getScheduledState())) {
             status.setRunStatus(RunStatus.Running);
+        } else if (!procNode.isValid()) {
+            status.setRunStatus(RunStatus.Invalid);
         } else {
             status.setRunStatus(RunStatus.Stopped);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 5b45d2a..bb93fa2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -946,50 +946,54 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
         final List<ValidationResult> results = new ArrayList<>();
         try {
 
-            final ValidationContext validationContext = this.getValidationContextFactory()
-                    .newValidationContext(getProperties(), getAnnotationData());
-
-            final Collection<ValidationResult> validationResults;
-            try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
-                validationResults = getProcessor().validate(validationContext);
-            }
+            // Processors may go invalid while RUNNING, but only validating while STOPPED is a trade-off
+            // we are willing to make in order to save on validation costs that would be unnecessary most of the time.
+            if (getScheduledState() == ScheduledState.STOPPED) {
+                final ValidationContext validationContext = this.getValidationContextFactory()
+                        .newValidationContext(getProperties(), getAnnotationData());
+
+                final Collection<ValidationResult> validationResults;
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    validationResults = getProcessor().validate(validationContext);
+                }
 
-            for (final ValidationResult result : validationResults) {
-                if (!result.isValid()) {
-                    results.add(result);
+                for (final ValidationResult result : validationResults) {
+                    if (!result.isValid()) {
+                        results.add(result);
+                    }
                 }
-            }
 
-            for (final Relationship relationship : getUndefinedRelationships()) {
-                if (!isAutoTerminated(relationship)) {
-                    final ValidationResult error = new ValidationResult.Builder()
+                for (final Relationship relationship : getUndefinedRelationships()) {
+                    if (!isAutoTerminated(relationship)) {
+                        final ValidationResult error = new ValidationResult.Builder()
                             .explanation("Relationship '" + relationship.getName()
                                     + "' is not connected to any component and is not auto-terminated")
                             .subject("Relationship " + relationship.getName()).valid(false).build();
-                    results.add(error);
+                        results.add(error);
+                    }
                 }
-            }
 
-            switch (getInputRequirement()) {
-            case INPUT_ALLOWED:
-                break;
-            case INPUT_FORBIDDEN: {
-                final int incomingConnCount = getIncomingNonLoopConnections().size();
-                if (incomingConnCount != 0) {
-                    results.add(new ValidationResult.Builder().explanation(
-                            "Processor does not allow upstream connections but currently has " + incomingConnCount)
-                            .subject("Upstream Connections").valid(false).build());
+                switch (getInputRequirement()) {
+                case INPUT_ALLOWED:
+                    break;
+                case INPUT_FORBIDDEN: {
+                    final int incomingConnCount = getIncomingNonLoopConnections().size();
+                    if (incomingConnCount != 0) {
+                        results.add(new ValidationResult.Builder().explanation(
+                                "Processor does not allow upstream connections but currently has " + incomingConnCount)
+                                .subject("Upstream Connections").valid(false).build());
+                    }
+                    break;
+                }
+                case INPUT_REQUIRED: {
+                    if (getIncomingNonLoopConnections().isEmpty()) {
+                        results.add(new ValidationResult.Builder()
+                                .explanation("Processor requires an upstream connection but currently has none")
+                                .subject("Upstream Connections").valid(false).build());
+                    }
+                    break;
                 }
-                break;
-            }
-            case INPUT_REQUIRED: {
-                if (getIncomingNonLoopConnections().isEmpty()) {
-                    results.add(new ValidationResult.Builder()
-                            .explanation("Processor requires an upstream connection but currently has none")
-                            .subject("Upstream Connections").valid(false).build());
                 }
-                break;
-            }
             }
         } catch (final Throwable t) {
             results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString())

http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 31c2242..21a6553 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.reporting;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -259,4 +260,13 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
     public String toString() {
         return "ReportingTask[id=" + getIdentifier() + ", name=" + getName() + "]";
     }
+
+    @Override
+    public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) {
+        Collection<ValidationResult> results = null;
+        if (getScheduledState() == ScheduledState.STOPPED) {
+            results = super.getValidationErrors(serviceIdentifiersNotToValidate);
+        }
+        return results != null ? results : Collections.<ValidationResult>emptySet();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 4aa9ab6..e8e3810 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -395,4 +395,13 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
             LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
         }
     }
+
+    @Override
+    public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) {
+        Collection<ValidationResult> results = null;
+        if (stateRef.get() == ControllerServiceState.DISABLED) {
+            results = super.getValidationErrors(serviceIdentifiersNotToValidate);
+        }
+        return results != null ? results : Collections.<ValidationResult>emptySet();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 9f6f783..dd44bc8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -33,6 +33,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.groups.ProcessGroup;
@@ -350,7 +351,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
             final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
             final String dataSize = FormatUtils.formatDataSize(bytesReceived);
-            logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
+            logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
                     this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
         }
 
@@ -364,31 +365,41 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
 
     @Override
     public boolean isValid() {
-        return getValidationErrors().isEmpty();
+        if (!targetExists.get()) {
+            return false;
+        }
+
+        if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
+            // if it's an output port, ensure that there is an outbound connection
+            return false;
+        }
+
+        return true;
     }
 
     @Override
     public Collection<ValidationResult> getValidationErrors() {
         final Collection<ValidationResult> validationErrors = new ArrayList<>();
-        ValidationResult error = null;
-        if (!targetExists.get()) {
-            error = new ValidationResult.Builder()
-            .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
-            .subject(String.format("Remote port '%s'", getName()))
-            .valid(false)
-            .build();
-        } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
-            error = new ValidationResult.Builder()
-            .explanation(String.format("Port '%s' has no outbound connections", getName()))
-            .subject(String.format("Remote port '%s'", getName()))
-            .valid(false)
-            .build();
-        }
+        if (getScheduledState() == ScheduledState.STOPPED) {
+            ValidationResult error = null;
+            if (!targetExists.get()) {
+                error = new ValidationResult.Builder()
+                        .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
+                        .subject(String.format("Remote port '%s'", getName()))
+                        .valid(false)
+                        .build();
+            } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
+                error = new ValidationResult.Builder()
+                        .explanation(String.format("Port '%s' has no outbound connections", getName()))
+                        .subject(String.format("Remote port '%s'", getName()))
+                        .valid(false)
+                        .build();
+            }
 
-        if (error != null) {
-            validationErrors.add(error);
+            if (error != null) {
+                validationErrors.add(error);
+            }
         }
-
         return validationErrors;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 66fd303..71932f7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -44,6 +44,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.controller.AbstractPort;
 import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.groups.ProcessGroup;
@@ -262,13 +263,15 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
     @Override
     public Collection<ValidationResult> getValidationErrors() {
         final Collection<ValidationResult> validationErrors = new ArrayList<>();
-        if (!isValid()) {
-            final ValidationResult error = new ValidationResult.Builder()
-                    .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
-                    .subject(String.format("Port '%s'", getName()))
-                    .valid(false)
-                    .build();
-            validationErrors.add(error);
+        if (getScheduledState() == ScheduledState.STOPPED) {
+            if (!isValid()) {
+                final ValidationResult error = new ValidationResult.Builder()
+                        .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
+                        .subject(String.format("Port '%s'", getName()))
+                        .valid(false)
+                        .build();
+                validationErrors.add(error);
+            }
         }
         return validationErrors;
     }