You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/11/21 22:15:10 UTC

nifi git commit: NIFI-2996 - validate processors only when they are in STOPPED state - report validation errors via REST API on processors/services/tasks/ports only when they are in the STOPPED state - This closes #1192

Repository: nifi
Updated Branches:
  refs/heads/master fb9cbccc3 -> 15af764dd


NIFI-2996
- validate processors only when they are in STOPPED state
- report validation errors via REST API on processors/services/tasks/ports only when they are in the STOPPED state
- This closes #1192


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/15af764d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/15af764d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/15af764d

Branch: refs/heads/master
Commit: 15af764dd8ad8c4a6789c556635e817a2a553828
Parents: fb9cbcc
Author: Mike Moser <mo...@apache.org>
Authored: Wed Nov 9 12:38:04 2016 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Mon Nov 21 17:02:25 2016 -0500

----------------------------------------------------------------------
 .../apache/nifi/controller/FlowController.java  | 12 +--
 .../nifi/controller/StandardProcessorNode.java  | 80 ++++++++++----------
 .../reporting/AbstractReportingTaskNode.java    | 10 +++
 .../service/StandardControllerServiceNode.java  |  9 +++
 .../controller/TestStandardProcessorNode.java   | 16 ++++
 .../nifi/remote/StandardRemoteGroupPort.java    | 41 +++++-----
 .../nifi/remote/StandardRootGroupPort.java      | 17 +++--
 7 files changed, 115 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 48b9c14..adab649 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2722,16 +2722,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             status.setFlowFilesRemoved(entry.getFlowFilesRemoved());
         }
 
-        // determine the run status and get any validation errors... must check
-        // is valid when not disabled since a processors validity could change due
-        // to environmental conditions (property configured with a file path and
-        // the file being externally removed)
+        // Determine the run status and get any validation error... only validating while STOPPED
+        // is a trade-off we are willing to make, even though processor validity could change due to
+        // environmental conditions (property configured with a file path and the file being externally
+        // removed). This saves on validation costs that would be unnecessary most of the time.
         if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
             status.setRunStatus(RunStatus.Disabled);
-        } else if (!procNode.isValid()) {
-            status.setRunStatus(RunStatus.Invalid);
         } else if (ScheduledState.RUNNING.equals(procNode.getScheduledState())) {
             status.setRunStatus(RunStatus.Running);
+        } else if (!procNode.isValid()) {
+            status.setRunStatus(RunStatus.Invalid);
         } else {
             status.setRunStatus(RunStatus.Stopped);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index f94cc8b..3d2c222 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -987,50 +987,54 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
     public Collection<ValidationResult> getValidationErrors() {
         final List<ValidationResult> results = new ArrayList<>();
         try {
-            final ValidationContext validationContext = this.getValidationContextFactory()
-                .newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier());
-
-            final Collection<ValidationResult> validationResults;
-            try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
-                validationResults = getProcessor().validate(validationContext);
-            }
-
-            for (final ValidationResult result : validationResults) {
-                if (!result.isValid()) {
-                    results.add(result);
+            // Processors may go invalid while RUNNING, but only validating while STOPPED is a trade-off
+            // we are willing to make in order to save on validation costs that would be unnecessary most of the time.
+            if (getScheduledState() == ScheduledState.STOPPED) {
+                final ValidationContext validationContext = this.getValidationContextFactory()
+                        .newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier());
+
+                final Collection<ValidationResult> validationResults;
+                try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
+                    validationResults = getProcessor().validate(validationContext);
                 }
-            }
 
-            for (final Relationship relationship : getUndefinedRelationships()) {
-                if (!isAutoTerminated(relationship)) {
-                    final ValidationResult error = new ValidationResult.Builder()
-                            .explanation("Relationship '" + relationship.getName()
-                                    + "' is not connected to any component and is not auto-terminated")
-                            .subject("Relationship " + relationship.getName()).valid(false).build();
-                    results.add(error);
+                for (final ValidationResult result : validationResults) {
+                    if (!result.isValid()) {
+                        results.add(result);
+                    }
                 }
-            }
 
-            switch (getInputRequirement()) {
-            case INPUT_ALLOWED:
-                break;
-            case INPUT_FORBIDDEN: {
-                final int incomingConnCount = getIncomingNonLoopConnections().size();
-                if (incomingConnCount != 0) {
-                    results.add(new ValidationResult.Builder().explanation(
-                            "Processor does not allow upstream connections but currently has " + incomingConnCount)
-                            .subject("Upstream Connections").valid(false).build());
+                for (final Relationship relationship : getUndefinedRelationships()) {
+                    if (!isAutoTerminated(relationship)) {
+                        final ValidationResult error = new ValidationResult.Builder()
+                                .explanation("Relationship '" + relationship.getName()
+                                        + "' is not connected to any component and is not auto-terminated")
+                                .subject("Relationship " + relationship.getName()).valid(false).build();
+                        results.add(error);
+                    }
                 }
-                break;
-            }
-            case INPUT_REQUIRED: {
-                if (getIncomingNonLoopConnections().isEmpty()) {
-                    results.add(new ValidationResult.Builder()
-                            .explanation("Processor requires an upstream connection but currently has none")
-                            .subject("Upstream Connections").valid(false).build());
+
+                switch (getInputRequirement()) {
+                    case INPUT_ALLOWED:
+                        break;
+                    case INPUT_FORBIDDEN: {
+                        final int incomingConnCount = getIncomingNonLoopConnections().size();
+                        if (incomingConnCount != 0) {
+                            results.add(new ValidationResult.Builder().explanation(
+                                    "Processor does not allow upstream connections but currently has " + incomingConnCount)
+                                    .subject("Upstream Connections").valid(false).build());
+                        }
+                        break;
+                    }
+                    case INPUT_REQUIRED: {
+                        if (getIncomingNonLoopConnections().isEmpty()) {
+                            results.add(new ValidationResult.Builder()
+                                    .explanation("Processor requires an upstream connection but currently has none")
+                                    .subject("Upstream Connections").valid(false).build());
+                        }
+                        break;
+                    }
                 }
-                break;
-            }
             }
         } catch (final Throwable t) {
             results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString())

http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index ac5525c..deca385 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -34,6 +34,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -241,4 +242,13 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
     public String getProcessGroupIdentifier() {
         return null;
     }
+
+    @Override
+    public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) {
+        Collection<ValidationResult> results = null;
+        if (getScheduledState() == ScheduledState.STOPPED) {
+            results = super.getValidationErrors(serviceIdentifiersNotToValidate);
+        }
+        return results != null ? results : Collections.emptySet();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index d4a16b3..2c5b096 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -438,4 +438,13 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
         final ProcessGroup procGroup = getProcessGroup();
         return procGroup == null ? null : procGroup.getIdentifier();
     }
+
+    @Override
+    public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) {
+        Collection<ValidationResult> results = null;
+        if (stateRef.get() == ControllerServiceState.DISABLED) {
+            results = super.getValidationErrors(serviceIdentifiersNotToValidate);
+        }
+        return results != null ? results : Collections.emptySet();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
index ce89ba5..e9623e3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
@@ -118,6 +118,22 @@ public class TestStandardProcessorNode {
     }
 
     @Test
+    public void testDisabledValidationErrors() {
+        final ModifiesClasspathNoAnnotationProcessor processor = new ModifiesClasspathNoAnnotationProcessor();
+        final StandardProcessorNode procNode = createProcessorNode(processor);
+
+        // Set a property to an invalid value
+        final Map<String, String> properties = new HashMap<>();
+        properties.put(ModifiesClasspathNoAnnotationProcessor.CLASSPATH_RESOURCE.getName(), "");
+        procNode.setProperties(properties);
+        Assert.assertTrue(procNode.getValidationErrors().size() > 0);
+
+        // Disabled processors skip property validation
+        procNode.disable();
+        Assert.assertFalse(procNode.getValidationErrors().size() > 0);
+    }
+
+    @Test
     public void testSinglePropertyDynamicallyModifiesClasspath() throws MalformedURLException {
         final PropertyDescriptor classpathProp = new PropertyDescriptor.Builder().name("Classpath Resources")
                 .dynamicallyModifiesClasspath(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();

http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index e8b542e..48d60d6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -34,6 +34,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.groups.ProcessGroup;
@@ -367,7 +368,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
             final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
             final String dataSize = FormatUtils.formatDataSize(bytesReceived);
-            logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
+            logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
                 this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
         }
 
@@ -381,31 +382,33 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
 
     @Override
     public boolean isValid() {
-        return getValidationErrors().isEmpty();
+        return targetExists.get()
+                && (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT ? !getConnections(Relationship.ANONYMOUS).isEmpty() : true);
     }
 
     @Override
     public Collection<ValidationResult> getValidationErrors() {
         final Collection<ValidationResult> validationErrors = new ArrayList<>();
-        ValidationResult error = null;
-        if (!targetExists.get()) {
-            error = new ValidationResult.Builder()
-                    .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
-                    .subject(String.format("Remote port '%s'", getName()))
-                    .valid(false)
-                    .build();
-        } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
-            error = new ValidationResult.Builder()
-                    .explanation(String.format("Port '%s' has no outbound connections", getName()))
-                    .subject(String.format("Remote port '%s'", getName()))
-                    .valid(false)
-                    .build();
-        }
+        if (getScheduledState() == ScheduledState.STOPPED) {
+            ValidationResult error = null;
+            if (!targetExists.get()) {
+                error = new ValidationResult.Builder()
+                        .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
+                        .subject(String.format("Remote port '%s'", getName()))
+                        .valid(false)
+                        .build();
+            } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
+                error = new ValidationResult.Builder()
+                        .explanation(String.format("Port '%s' has no outbound connections", getName()))
+                        .subject(String.format("Remote port '%s'", getName()))
+                        .valid(false)
+                        .build();
+            }
 
-        if (error != null) {
-            validationErrors.add(error);
+            if (error != null) {
+                validationErrors.add(error);
+            }
         }
-
         return validationErrors;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/15af764d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 4c78dd5..5cb26ea 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -30,6 +30,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.controller.AbstractPort;
 import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.groups.ProcessGroup;
@@ -273,13 +274,15 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
     @Override
     public Collection<ValidationResult> getValidationErrors() {
         final Collection<ValidationResult> validationErrors = new ArrayList<>();
-        if (!isValid()) {
-            final ValidationResult error = new ValidationResult.Builder()
-                    .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
-                    .subject(String.format("Port '%s'", getName()))
-                    .valid(false)
-                    .build();
-            validationErrors.add(error);
+        if (getScheduledState() == ScheduledState.STOPPED) {
+            if (!isValid()) {
+                final ValidationResult error = new ValidationResult.Builder()
+                        .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
+                        .subject(String.format("Port '%s'", getName()))
+                        .valid(false)
+                        .build();
+                validationErrors.add(error);
+            }
         }
         return validationErrors;
     }