You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/01/10 16:17:47 UTC

[GitHub] asfgit closed pull request #3259: NIFI-5944: When components are started on NiFi startup, if they are i…

asfgit closed pull request #3259: NIFI-5944: When components are started on NiFi startup, if they are i…
URL: https://github.com/apache/nifi/pull/3259
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index f3ae41fc1e..e17682eeed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -373,9 +373,8 @@ public String toString() {
     }
 
     @Override
-    public final void performValidation() {
-        boolean replaced = false;
-        do {
+    public final ValidationStatus performValidation() {
+        while (true) {
             final ValidationState validationState = getValidationState();
 
             final ValidationContext validationContext = getValidationContext();
@@ -391,8 +390,11 @@ public final void performValidation() {
 
             final ValidationStatus status = results.isEmpty() ? ValidationStatus.VALID : ValidationStatus.INVALID;
             final ValidationState updatedState = new ValidationState(status, results);
-            replaced = replaceValidationState(validationState, updatedState);
-        } while (!replaced);
+            final boolean replaced = replaceValidationState(validationState, updatedState);
+            if (replaced) {
+                return status;
+            }
+        }
     }
 
     protected Collection<ValidationResult> computeValidationErrors(final ValidationContext validationContext) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
index d0ed572515..2357d41cd0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
@@ -179,7 +179,7 @@ public default void setProperties(Map<String, String> properties) {
     /**
      * Asynchronously begins the validation process
      */
-    public abstract void performValidation();
+    public abstract ValidationStatus performValidation();
 
     /**
      * Returns a {@link List} of all {@link PropertyDescriptor}s that this
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 6e8206eb71..12eeb88f23 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller;
 
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.scheduling.LifecycleState;
@@ -144,7 +145,13 @@ public ProcessorNode(final String id,
     public ScheduledState getScheduledState() {
         ScheduledState sc = this.scheduledState.get();
         if (sc == ScheduledState.STARTING) {
-            return ScheduledState.RUNNING;
+            final ValidationStatus validationStatus = getValidationStatus();
+
+            if (validationStatus == ValidationStatus.INVALID) {
+                return ScheduledState.STOPPED;
+            } else {
+                return ScheduledState.RUNNING;
+            }
         } else if (sc == ScheduledState.STOPPING) {
             return ScheduledState.STOPPED;
         }
@@ -240,4 +247,12 @@ public abstract void start(ScheduledExecutorService scheduler, long administrati
      * will result in the WARN message if processor can not be enabled.
      */
     public abstract void disable();
+
+    /**
+     * Returns the Scheduled State that is desired for this Processor. This may vary from the current state if the Processor is not
+     * currently valid, is in the process of stopping but should then transition to Running, etc.
+     *
+     * @return the desired state for this Processor
+     */
+    public abstract ScheduledState getDesiredState();
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index dad01b8a24..8ab7e69d54 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -912,7 +912,6 @@ public void trigger(final ComponentNode component) {
 
                     try {
                         if (connectable instanceof ProcessorNode) {
-                            ((ProcessorNode) connectable).getValidationStatus(5, TimeUnit.SECONDS);
                             connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
                         } else {
                             startConnectable(connectable);
@@ -1242,7 +1241,7 @@ public ScheduledState getScheduledState(final ProcessorNode procNode) {
                         return ScheduledState.RUNNING;
                     }
 
-                    return procNode.getScheduledState();
+                    return procNode.getDesiredState();
                 }
 
                 @Override
@@ -1699,7 +1698,6 @@ public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
             throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode.getIdentifier() + " because the controller is terminated");
         }
 
-        reportingTaskNode.performValidation(); // ensure that the reporting task has completed its validation before attempting to start it
         reportingTaskNode.verifyCanStart();
         reportingTaskNode.reloadAdditionalResourcesIfNecessary();
         processScheduler.schedule(reportingTaskNode);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 4adfad49b9..8ca51731e2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -38,6 +38,7 @@
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
@@ -139,7 +140,7 @@
     private final ProcessScheduler processScheduler;
     private long runNanos = 0L;
     private volatile long yieldNanos;
-    private volatile ScheduledState desiredState;
+    private volatile ScheduledState desiredState = ScheduledState.STOPPED;
     private volatile LogLevel bulletinLevel = LogLevel.WARN;
 
     private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
@@ -1343,13 +1344,6 @@ public void disable() {
     public void start(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final long timeoutMillis, final ProcessContext processContext,
             final SchedulingAgentCallback schedulingAgentCallback, final boolean failIfStopping) {
 
-        switch (getValidationStatus()) {
-            case INVALID:
-                throw new IllegalStateException("Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors());
-            case VALIDATING:
-                throw new IllegalStateException("Processor " + this.getName() + " cannot be started because its validation is still being performed");
-        }
-
         final Processor processor = processorRef.get().getProcessor();
         final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
 
@@ -1487,6 +1481,25 @@ private void initiateStart(final ScheduledExecutorService taskScheduler, final l
 
         // Create a task to invoke the @OnScheduled annotation of the processor
         final Callable<Void> startupTask = () -> {
+            final ScheduledState currentScheduleState = scheduledState.get();
+            if (currentScheduleState == ScheduledState.STOPPING || currentScheduleState == ScheduledState.STOPPED) {
+                LOG.debug("{} is stopped. Will not call @OnScheduled lifecycle methods or begin trigger onTrigger() method", StandardProcessorNode.this);
+                schedulingAgentCallback.onTaskComplete();
+                return null;
+            }
+
+            final ValidationStatus validationStatus = getValidationStatus();
+            if (validationStatus != ValidationStatus.VALID) {
+                LOG.debug("Cannot start {} because Processor is currently not valid; will try again after 5 seconds", StandardProcessorNode.this);
+
+                // re-initiate the entire process
+                final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, processContext, schedulingAgentCallback);
+                taskScheduler.schedule(initiateStartTask, 5, TimeUnit.SECONDS);
+
+                schedulingAgentCallback.onTaskComplete();
+                return null;
+            }
+
             LOG.debug("Invoking @OnScheduled methods of {}", processor);
 
             // Now that the task has been scheduled, set the timeout
@@ -1696,6 +1709,10 @@ public void run() {
         return future;
     }
 
+    @Override
+    public ScheduledState getDesiredState() {
+        return desiredState;
+    }
 
     private void monitorAsyncTask(final Future<?> taskFuture, final Future<?> monitoringFuture, final long completionTimestamp) {
         if (taskFuture.isDone()) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index bce85f86e3..f1b585e5f4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -16,16 +16,11 @@
  */
 package org.apache.nifi.controller.reporting;
 
-import java.net.URL;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.AbstractComponentNode;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -51,6 +46,12 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.AnnotationUtils;
 
+import java.net.URL;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 public abstract class AbstractReportingTaskNode extends AbstractComponentNode implements ReportingTaskNode {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractReportingTaskNode.class);
@@ -176,7 +177,7 @@ public boolean isRunning() {
 
     @Override
     public boolean isValidationNecessary() {
-        return !processScheduler.isScheduled(this);
+        return !processScheduler.isScheduled(this) || getValidationStatus() != ValidationStatus.VALID;
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 2ff3307533..a7d5fd8a1d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -21,6 +21,7 @@
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
@@ -186,13 +187,6 @@ public void schedule(final ReportingTaskNode taskNode) {
             throw new IllegalStateException("Reporting Task " + taskNode.getName() + " cannot be started because it has " + activeThreadCount + " threads still running");
         }
 
-        switch (taskNode.getValidationStatus()) {
-            case INVALID:
-                throw new IllegalStateException("Reporting Task " + taskNode.getName() + " is not in a valid state for the following reasons: " + taskNode.getValidationErrors());
-            case VALIDATING:
-                throw new IllegalStateException("Reporting Task " + taskNode.getName() + " cannot be scheduled because it is in the process of validating its configuration");
-        }
-
         final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
         lifecycleState.setScheduled(true);
 
@@ -216,6 +210,13 @@ public void run() {
                             return;
                         }
 
+                        final ValidationStatus validationStatus = taskNode.getValidationStatus();
+                        if (validationStatus != ValidationStatus.VALID) {
+                            LOG.debug("Cannot schedule {} to run because it is currently invalid. Will try again in 5 seconds", taskNode);
+                            componentLifeCycleThreadPool.schedule(this, 5, TimeUnit.SECONDS);
+                            return;
+                        }
+
                         try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), reportingTask.getClass(), reportingTask.getIdentifier())) {
                             ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
                         }
@@ -231,8 +232,11 @@ public void run() {
                             + "ReportingTask and will attempt to schedule it again after {}",
                             new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
 
-                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, taskNode.getConfigurationContext());
-                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, taskNode.getConfigurationContext());
+
+                    try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), reportingTask.getClass(), reportingTask.getIdentifier())) {
+                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, taskNode.getConfigurationContext());
+                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, taskNode.getConfigurationContext());
+                    }
 
                     componentLifeCycleThreadPool.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
                 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
index 39693b8987..4a6b4206b6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
@@ -30,7 +30,7 @@
     public static final ScheduledStateLookup IDENTITY_LOOKUP = new ScheduledStateLookup() {
         @Override
         public ScheduledState getScheduledState(final ProcessorNode procNode) {
-            return procNode.getScheduledState();
+            return procNode.getDesiredState();
         }
 
         @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 795fc8c7d2..3d6329f8fa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,25 +16,6 @@
  */
 package org.apache.nifi.controller.service;
 
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.documentation.DeprecationNotice;
@@ -47,8 +28,7 @@
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.AbstractComponentNode;
 import org.apache.nifi.controller.ComponentNode;
@@ -71,6 +51,24 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 public class StandardControllerServiceNode extends AbstractComponentNode implements ControllerServiceNode {
 
     private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
@@ -319,14 +317,6 @@ public void verifyCanEnable() {
         if (getState() != ControllerServiceState.DISABLED) {
             throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled");
         }
-
-        final ValidationState validationState = getValidationState();
-        switch (validationState.getStatus()) {
-            case INVALID:
-                throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not valid: " + validationState.getValidationErrors());
-            case VALIDATING:
-                throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because its validation has not yet completed");
-        }
     }
 
     @Override
@@ -334,11 +324,6 @@ public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences)
         if (getState() != ControllerServiceState.DISABLED) {
             throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled");
         }
-
-        final Collection<ValidationResult> validationErrors = getValidationErrors(ignoredReferences);
-        if (ignoredReferences != null && !validationErrors.isEmpty()) {
-            throw new IllegalStateException("Controller Service with ID " + getIdentifier() + " cannot be enabled because it is not currently valid: " + validationErrors);
-        }
     }
 
     @Override
@@ -389,8 +374,11 @@ public boolean isValidationNecessary() {
             case DISABLED:
             case DISABLING:
                 return true;
-            case ENABLED:
             case ENABLING:
+                // If enabling and currently not valid, then we must trigger validation to occur. This allows the #enable method
+                // to continue running in the background and complete enabling when the service becomes valid.
+                return getValidationStatus() != ValidationStatus.VALID;
+            case ENABLED:
             default:
                 return false;
         }
@@ -398,7 +386,7 @@ public boolean isValidationNecessary() {
 
     /**
      * Will atomically enable this service by invoking its @OnEnabled operation.
-     * It uses CAS operation on {@link #stateRef} to transition this service
+     * It uses CAS operation on {@link #stateTransition} to transition this service
      * from DISABLED to ENABLING state. If such transition succeeds the service
      * will be marked as 'active' (see {@link ControllerServiceNode#isActive()}).
      * If such transition doesn't succeed then no enabling logic will be
@@ -429,6 +417,20 @@ public boolean isValidationNecessary() {
             scheduler.execute(new Runnable() {
                 @Override
                 public void run() {
+                    if (!isActive()) {
+                        LOG.debug("{} is no longer active so will not attempt to enable it", StandardControllerServiceNode.this);
+                        stateTransition.disable();
+                        return;
+                    }
+
+                    final ValidationStatus validationStatus = getValidationStatus();
+                    if (validationStatus != ValidationStatus.VALID) {
+                        LOG.debug("Cannot enable {} because it is not currently valid. Will try again in 5 seconds", StandardControllerServiceNode.this);
+                        scheduler.schedule(this, 5, TimeUnit.SECONDS);
+                        future.completeExceptionally(new RuntimeException(this + " cannot be enabled because it is not currently valid. Will try again in 5 seconds."));
+                        return;
+                    }
+
                     try {
                         try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getControllerServiceImplementation().getClass(), getIdentifier())) {
                             ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
@@ -446,7 +448,7 @@ public void run() {
                             invokeDisable(configContext);
                             stateTransition.disable();
                         } else {
-                            LOG.debug("Successfully enabled {}", service);
+                            LOG.info("Successfully enabled {}", service);
                         }
                     } catch (Exception e) {
                         future.completeExceptionally(e);
@@ -478,7 +480,7 @@ public void run() {
 
     /**
      * Will atomically disable this service by invoking its @OnDisabled operation.
-     * It uses CAS operation on {@link #stateRef} to transition this service
+     * It uses CAS operation on {@link #stateTransition} to transition this service
      * from ENABLED to DISABLING state. If such transition succeeds the service
      * will be de-activated (see {@link ControllerServiceNode#isActive()}).
      * If such transition doesn't succeed (the service is still in ENABLING state)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 920999e4d6..242c0ada87 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -24,6 +24,7 @@
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -92,6 +93,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -342,6 +344,8 @@ public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception {
         final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
 
+        serviceNode.performValidation();
+
         assertFalse(serviceNode.isActive());
         final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
         final ExecutorService executor = Executors.newCachedThreadPool();
@@ -361,10 +365,10 @@ public void run() {
                 }
             });
         }
-        // need to sleep a while since we are emulating async invocations on
-        // method that is also internally async
-        Thread.sleep(500);
+
         executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
+
         assertFalse(asyncFailed.get());
         assertEquals(1, ts.enableInvocationCount());
     }
@@ -399,10 +403,9 @@ public void run() {
                 }
             });
         }
-        // need to sleep a while since we are emulating async invocations on
-        // method that is also internally async
-        Thread.sleep(500);
+
         executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
         assertFalse(asyncFailed.get());
         assertEquals(0, ts.disableInvocationCount());
     }
@@ -419,8 +422,10 @@ public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception {
         final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
 
+        assertSame(ValidationStatus.VALID, serviceNode.performValidation());
+
         final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
-        scheduler.enableControllerService(serviceNode);
+        scheduler.enableControllerService(serviceNode).get();
         assertTrue(serviceNode.isActive());
         final ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -441,8 +446,8 @@ public void run() {
         }
         // need to sleep a while since we are emulating async invocations on
         // method that is also internally async
-        Thread.sleep(500);
         executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS); // change to seconds.
         assertFalse(asyncFailed.get());
         assertEquals(1, ts.disableInvocationCount());
     }
@@ -453,9 +458,17 @@ public void validateDisablingOfTheFailedService() throws Exception {
 
         final ControllerServiceNode serviceNode = flowManager.createControllerService(FailingService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
-        scheduler.enableControllerService(serviceNode);
-        Thread.sleep(1000);
+        serviceNode.performValidation();
+
+        final Future<?> future = scheduler.enableControllerService(serviceNode);
+        try {
+            future.get();
+        } catch (final Exception e) {
+            // Expected behavior because the FailingService throws Exception when attempting to enable
+        }
+
         scheduler.shutdown();
+
         /*
          * Because it was never disabled it will remain active since its
          * enabling is being retried. This may actually be a bug in the
@@ -528,14 +541,20 @@ public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception {
 
         final ControllerServiceNode serviceNode = flowManager.createControllerService(LongEnablingService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
+
         final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
         ts.setLimit(Long.MAX_VALUE);
+
+        serviceNode.performValidation();
         scheduler.enableControllerService(serviceNode);
-        Thread.sleep(100);
+
         assertTrue(serviceNode.isActive());
+        final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+        while (ts.enableInvocationCount() != 1 && System.nanoTime() <= maxTime) {
+            Thread.sleep(1L);
+        }
         assertEquals(1, ts.enableInvocationCount());
 
-        Thread.sleep(1000);
         scheduler.disableControllerService(serviceNode);
         assertFalse(serviceNode.isActive());
         assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index daee3c8e57..ebeb916b02 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -21,6 +21,7 @@
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.ExtensionBuilder;
 import org.apache.nifi.controller.FlowController;
@@ -73,6 +74,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 public class TestStandardControllerServiceProvider {
@@ -197,8 +199,8 @@ public void testDisableControllerService() {
         provider.disableControllerService(serviceNode);
     }
 
-    @Test(timeout = 1000000)
-    public void testEnableDisableWithReference() {
+    @Test(timeout = 10000)
+    public void testEnableDisableWithReference() throws InterruptedException {
         final ProcessGroup group = new MockProcessGroup(controller);
         final FlowController controller = Mockito.mock(FlowController.class);
         final FlowManager flowManager = Mockito.mock(FlowManager.class);
@@ -221,17 +223,24 @@ public void testEnableDisableWithReference() {
 
         try {
             provider.enableControllerService(serviceNodeA);
-            Assert.fail("Was able to enable Service A but Service B is disabled.");
         } catch (final IllegalStateException expected) {
         }
 
+        assertSame(ControllerServiceState.ENABLING, serviceNodeA.getState());
+
         serviceNodeB.performValidation();
-        serviceNodeB.getValidationStatus(5, TimeUnit.SECONDS);
+        assertSame(ValidationStatus.VALID, serviceNodeB.getValidationStatus(5, TimeUnit.SECONDS));
         provider.enableControllerService(serviceNodeB);
 
         serviceNodeA.performValidation();
-        serviceNodeA.getValidationStatus(5, TimeUnit.SECONDS);
-        provider.enableControllerService(serviceNodeA);
+        assertSame(ValidationStatus.VALID, serviceNodeA.getValidationStatus(5, TimeUnit.SECONDS));
+
+        final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+        // Wait for Service A to become ENABLED. This will happen in a background thread after approximately 5 seconds, now that Service A is valid.
+        while (serviceNodeA.getState() != ControllerServiceState.ENABLED && System.nanoTime() <= maxTime) {
+            Thread.sleep(5L);
+        }
+        assertSame(ControllerServiceState.ENABLED, serviceNodeA.getState());
 
         try {
             provider.disableControllerService(serviceNodeB);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services