You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/12/31 21:37:33 UTC

[1/5] nifi git commit: NIFI-1164 Fixed race condition and refactored

Repository: nifi
Updated Branches:
  refs/heads/master 03730adb3 -> 8e031c987


NIFI-1164 Fixed race condition and refactored

Changed ControllerServiceNode by adding enable(..), disable(..) and isActive() operations. See javadocs for more details in both ControllerServiceNode and StandardControllerServiceNode

Refactored service enable/disable logic in StandardProcessScheduler and StandardControllerServiceNode . Below are some of the notes:
- No need for resetting class loader since its going to derive from the class loader of the service. In other words any classes that aren’t loaded and will be loaded within the scope of the already loaded service will be loaded by the class lower of that service
- No need to control 'scheduleState.isScheduled()’ since the logic has changed to use CAS operation on state update and the service state change is now atomic.
- Removed Thread.sleep(..) and while(true) loop in favor of rescheduling re-tries achieving better thread utilization since the thread that would normally block in Thread.sleep(..) is now reused.
- Added tests and validated that the race condition no longer happening

Added additional logic that allows the initiation of the service disabling while it is in ENABLING state. See javadoc of StandardProcessScheduler.enable/disable for more details.

NIFI-1164 polishing


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/909c0dec
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/909c0dec
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/909c0dec

Branch: refs/heads/master
Commit: 909c0decd68a54f36eb61f815d10bd34e6a5b862
Parents: ebcefaa
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Tue Dec 15 13:21:00 2015 -0500
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Dec 28 09:22:23 2015 -0500

----------------------------------------------------------------------
 .../service/ControllerServiceNode.java          |  41 ++-
 .../scheduling/StandardProcessScheduler.java    | 151 +----------
 .../service/StandardControllerServiceNode.java  | 122 ++++++++-
 .../StandardControllerServiceProvider.java      |  35 +--
 .../StandardControllerServiceReference.java     |   3 +-
 .../TestStandardProcessScheduler.java           | 253 +++++++++++++++++++
 .../TestStandardControllerServiceProvider.java  |   2 +-
 7 files changed, 429 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/909c0dec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 481a231..b229af0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -17,9 +17,11 @@
 package org.apache.nifi.controller.service;
 
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.Heartbeater;
 
 public interface ControllerServiceNode extends ConfiguredComponent {
 
@@ -53,11 +55,31 @@ public interface ControllerServiceNode extends ConfiguredComponent {
      */
     ControllerServiceState getState();
 
+     /**
+     * Will enable this service. Enabling of the service typically means
+     * invoking it's operation that is annotated with @OnEnabled.
+     *
+     * @param scheduler
+     *            implementation of {@link ScheduledExecutorService} used to
+     *            initiate service enabling task as well as its re-tries
+     * @param administrativeYieldMillis
+     *            the amount of milliseconds to wait for administrative yield
+     * @param heartbeater
+     *            the instance of {@link Heartbeater}
+     */
+    void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater);
+
     /**
-     * Updates the state of the Controller Service to the provided new state
-     * @param state the state to set the service to
+     * Will disable this service. Disabling of the service typically means
+     * invoking it's operation that is annotated with @OnDisabled.
+     *
+     * @param scheduler
+     *            implementation of {@link ScheduledExecutorService} used to
+     *            initiate service disabling task
+     * @param heartbeater
+     *            the instance of {@link Heartbeater}
      */
-    void setState(ControllerServiceState state);
+    void disable(ScheduledExecutorService scheduler, Heartbeater heartbeater);
 
     /**
      * @return the ControllerServiceReference that describes which components are referencing this Controller Service
@@ -111,4 +133,17 @@ public interface ControllerServiceNode extends ConfiguredComponent {
     void verifyCanDelete();
 
     void verifyCanUpdate();
+
+    /**
+     * Returns 'true' if this service is active. The service is considered to be
+     * active if and only if it's
+     * {@link #enable(ScheduledExecutorService, long, Heartbeater)} operation
+     * has been invoked and the service has been transitioned to ENABLING state.
+     * The service will also remain 'active' after its been transitioned to
+     * ENABLED state. 
+     * <br>
+     * The service will be de-activated upon invocation of
+     * {@link #disable(ScheduledExecutorService, Heartbeater)}.
+     */
+    boolean isActive();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/909c0dec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index ddd4d5c..82fc812 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -19,23 +19,15 @@ package org.apache.nifi.controller.scheduling;
 import static java.util.Objects.requireNonNull;
 
 import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -54,8 +46,6 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.annotation.OnConfigured;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.logging.ComponentLog;
@@ -89,7 +79,9 @@ public final class StandardProcessScheduler implements ProcessScheduler {
     private final ScheduledExecutorService frameworkTaskExecutor;
     private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> strategyAgentMap = new ConcurrentHashMap<>();
     // thread pool for starting/stopping components
-    private final ExecutorService componentLifeCycleThreadPool = new ThreadPoolExecutor(25, 50, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5000));
+
+    private final ScheduledExecutorService componentLifeCycleThreadPool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
+
     private final StringEncryptor encryptor;
 
     public StandardProcessScheduler(final Heartbeater heartbeater, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor) {
@@ -625,145 +617,20 @@ public final class StandardProcessScheduler implements ProcessScheduler {
 
     @Override
     public void enableControllerService(final ControllerServiceNode service) {
-        service.setState(ControllerServiceState.ENABLING);
-        final ScheduleState scheduleState = getScheduleState(service);
-
-        final Runnable enableRunnable = new Runnable() {
-            @Override
-            public void run() {
-                try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                    final long lastStopTime = scheduleState.getLastStopTime();
-                    final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
-
-                    while (true) {
-                        try {
-                            synchronized (scheduleState) {
-                                // if no longer enabled, then we're finished. This can happen, for example,
-                                // if the @OnEnabled method throws an Exception and the user disables the service
-                                // while we're administratively yielded.
-                                //
-                                // we also check if the schedule state's last stop time is equal to what it was before.
-                                // if not, then means that the service has been disabled and enabled again, so we should just
-                                // bail; another thread will be responsible for invoking the @OnEnabled methods.
-                                if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) {
-                                    return;
-                                }
-
-                                ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service.getControllerServiceImplementation(), configContext);
-                                heartbeater.heartbeat();
-                                service.setState(ControllerServiceState.ENABLED);
-                                return;
-                            }
-                        } catch (final Exception e) {
-                            final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
-
-                            final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
-                            componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
-                            LOG.error("Failed to invoke @OnEnabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
-                            if (LOG.isDebugEnabled()) {
-                                LOG.error("", cause);
-                            }
-
-                            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
-                            Thread.sleep(administrativeYieldMillis);
-                            continue;
-                        }
-                    }
-                } catch (final Throwable t) {
-                    final Throwable cause = t instanceof InvocationTargetException ? t.getCause() : t;
-                    final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
-                    componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
-
-                    LOG.error("Failed to invoke @OnEnabled method on {} due to {}", service.getControllerServiceImplementation(), cause.toString());
-                    if (LOG.isDebugEnabled()) {
-                        LOG.error("", cause);
-                    }
-                }
-            }
-        };
-
-        scheduleState.setScheduled(true);
-        componentLifeCycleThreadPool.execute(enableRunnable);
+        service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, this.heartbeater);
     }
 
-
     @Override
     public void disableControllerService(final ControllerServiceNode service) {
-        disableControllerServices(Collections.singletonList(service));
+        service.disable(this.componentLifeCycleThreadPool, this.heartbeater);
     }
 
     @Override
     public void disableControllerServices(final List<ControllerServiceNode> services) {
-        if (requireNonNull(services).isEmpty()) {
-            return;
-        }
-
-        final List<ControllerServiceNode> servicesToDisable = new ArrayList<>(services.size());
-        for (final ControllerServiceNode serviceToDisable : services) {
-            if (serviceToDisable.getState() == ControllerServiceState.DISABLED || serviceToDisable.getState() == ControllerServiceState.DISABLING) {
-                continue;
-            }
-
-            servicesToDisable.add(serviceToDisable);
-        }
-
-        if (servicesToDisable.isEmpty()) {
-            return;
-        }
-
-        // ensure that all controller services can be disabled.
-        for (final ControllerServiceNode serviceNode : servicesToDisable) {
-            final Set<ControllerServiceNode> ignoredReferences = new HashSet<>(services);
-            ignoredReferences.remove(serviceNode);
-            serviceNode.verifyCanDisable(ignoredReferences);
-        }
-
-        // mark services as disabling
-        for (final ControllerServiceNode serviceNode : servicesToDisable) {
-            serviceNode.setState(ControllerServiceState.DISABLING);
-
-            final ScheduleState scheduleState = getScheduleState(serviceNode);
-            synchronized (scheduleState) {
-                scheduleState.setScheduled(false);
+        if (!requireNonNull(services).isEmpty()) {
+            for (ControllerServiceNode controllerServiceNode : services) {
+                this.disableControllerService(controllerServiceNode);
             }
         }
-
-        final Queue<ControllerServiceNode> nodes = new LinkedList<>(servicesToDisable);
-        final Runnable disableRunnable = new Runnable() {
-            @Override
-            public void run() {
-                ControllerServiceNode service;
-                while ((service = nodes.poll()) != null) {
-                    try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                        final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
-
-                        try {
-                            ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
-                        } catch (final Exception e) {
-                            final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
-                            final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
-                            componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
-
-                            LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
-                            if (LOG.isDebugEnabled()) {
-                                LOG.error("", cause);
-                            }
-
-                            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
-                            try {
-                                Thread.sleep(administrativeYieldMillis);
-                            } catch (final InterruptedException ie) {
-                            }
-                        } finally {
-                            service.setState(ControllerServiceState.DISABLED);
-                            heartbeater.heartbeat();
-                        }
-                    }
-                }
-            }
-        };
-
-        componentLifeCycleThreadPool.execute(disableRunnable);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/909c0dec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 8e7f1f5..ce4a767 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,28 +16,41 @@
  */
 package org.apache.nifi.controller.service;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractConfiguredComponent;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.annotation.OnConfigured;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
 
+    private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
+
     private final ControllerService proxedControllerService;
     private final ControllerService implementation;
     private final ControllerServiceProvider serviceProvider;
@@ -51,12 +64,15 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     private final Set<ConfiguredComponent> referencingComponents = new HashSet<>();
     private String comment;
 
+    private final AtomicBoolean active;
+
     public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
             final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
         super(implementation, id, validationContextFactory, serviceProvider);
         this.proxedControllerService = proxiedControllerService;
         this.implementation = implementation;
         this.serviceProvider = serviceProvider;
+        this.active = new AtomicBoolean();
     }
 
     @Override
@@ -146,8 +162,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
 
     @Override
     public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
-        final ControllerServiceState state = getState();
-        if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
+        if (!this.isActive()) {
             throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation() + " because it is not enabled");
         }
 
@@ -229,7 +244,106 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     }
 
     @Override
-    public void setState(final ControllerServiceState state) {
-        this.stateRef.set(state);
+    public boolean isActive() {
+        return this.active.get();
+    }
+
+    /**
+     * Will atomically enable this service by invoking its @OnEnabled operation.
+     * It uses CAS operation on {@link #stateRef} to transition this service
+     * from DISABLED to ENABLING state. If such transition succeeds the service
+     * will be marked as 'active' (see {@link ControllerServiceNode#isActive()}).
+     * If such transition doesn't succeed then no enabling logic will be
+     * performed and the method will exit. In other words it is safe to invoke
+     * this operation multiple times and from multiple threads.
+     * <br>
+     * This operation will also perform re-try of service enabling in the event
+     * of exception being thrown by previous invocation of @OnEnabled.
+     * <br>
+     * Upon successful invocation of @OnEnabled this service will be transitioned to
+     * ENABLED state.
+     * <br>
+     * In the event where enabling took longer then expected by the user and such user
+     * initiated disable operation, this service will be automatically disabled as soon
+     * as it reached ENABLED state.
+     */
+    @Override
+    public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis, final Heartbeater heartbeater) {
+        if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)){
+            this.active.set(true);
+            final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
+            scheduler.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
+                        if (stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED)) {
+                            heartbeater.heartbeat();
+                        } else {
+                            LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated.");
+                            // Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be
+                            // set to DISABLING (see disable() operation)
+                            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
+                            stateRef.set(ControllerServiceState.DISABLED);
+                        }
+                    } catch (Exception e) {
+                        final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
+                        final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
+                        componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
+                        LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
+                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
+                        if (isActive()) {
+                            scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
+                        }
+                        else {
+                            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
+                            stateRef.set(ControllerServiceState.DISABLED);
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+    /**
+     * Will atomically disable this service by invoking its @OnDisabled operation.
+     * It uses CAS operation on {@link #stateRef} to transition this service
+     * from ENABLED to DISABLING state. If such transition succeeds the service
+     * will be de-activated (see {@link ControllerServiceNode#isActive()}).
+     * If such transition doesn't succeed (the service is still in ENABLING state)
+     * then the service will still be transitioned to DISABLING state to ensure that
+     * no other transition could happen on this service. However in such event
+     * (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long, Heartbeater)}
+     * operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long, Heartbeater)}
+     * <br>
+     * Upon successful invocation of @OnDisabled this service will be transitioned to
+     * DISABLED state.
+     */
+    @Override
+    public void disable(final ScheduledExecutorService scheduler, final Heartbeater heartbeater) {
+        this.active.set(false); // de-activating regardless of CAS operation
+                                // that follows since this operation will always result in service state being DISABLING
+        if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) {
+            final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
+            scheduler.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext);
+                    } catch (Exception e) {
+                        final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
+                        final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
+                        componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
+                        LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
+                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
+                    } finally {
+                        stateRef.set(ControllerServiceState.DISABLED);
+                        heartbeater.heartbeat();
+                    }
+                }
+            });
+        } else {
+            this.stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.DISABLING);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/909c0dec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 1f1a1c0..6561eb8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -203,14 +203,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
         // Get a list of all Controller Services that need to be disabled, in the order that they need to be
         // disabled.
         final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+
         final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
 
         for (final ControllerServiceNode nodeToDisable : toDisable) {
-            final ControllerServiceState state = nodeToDisable.getState();
-
-            if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
-                nodeToDisable.verifyCanDisable(serviceSet);
-            }
+            nodeToDisable.verifyCanDisable(serviceSet);
         }
 
         Collections.reverse(toDisable);
@@ -319,14 +316,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             logger.info("Will enable {} Controller Services", servicesToEnable.size());
         }
 
-        // Mark all services that are configured to be enabled as 'ENABLING'. This allows Processors, reporting tasks
-        // to be valid so that they can be scheduled.
-        for (final List<ControllerServiceNode> branch : branches) {
-            for (final ControllerServiceNode nodeToEnable : branch) {
-                nodeToEnable.setState(ControllerServiceState.ENABLING);
-            }
-        }
-
         final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>());
         final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()));
         for (final List<ControllerServiceNode> branch : branches) {
@@ -422,6 +411,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
     @Override
     public void disableControllerService(final ControllerServiceNode serviceNode) {
+        serviceNode.verifyCanDisable();
         processScheduler.disableControllerService(serviceNode);
     }
 
@@ -545,23 +535,20 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     }
 
     private void enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) {
-        if (serviceNode.getState() != ControllerServiceState.ENABLED && serviceNode.getState() != ControllerServiceState.ENABLING) {
+        if (!serviceNode.isActive()) {
             serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences));
         }
 
         final Set<ControllerServiceNode> ifEnabled = new HashSet<>();
-        final List<ControllerServiceNode> toEnable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
-        for (final ControllerServiceNode nodeToEnable : toEnable) {
-            final ControllerServiceState state = nodeToEnable.getState();
-            if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
+        for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
+            if (!nodeToEnable.isActive()) {
                 nodeToEnable.verifyCanEnable(ifEnabled);
                 ifEnabled.add(nodeToEnable);
             }
         }
 
-        for (final ControllerServiceNode nodeToEnable : toEnable) {
-            final ControllerServiceState state = nodeToEnable.getState();
-            if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
+        for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
+            if (!nodeToEnable.isActive()) {
                 enableControllerService(nodeToEnable);
             }
         }
@@ -606,11 +593,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
         final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
 
         for (final ControllerServiceNode nodeToDisable : toDisable) {
-            final ControllerServiceState state = nodeToDisable.getState();
-
-            if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
-                nodeToDisable.verifyCanDisable(serviceSet);
-            }
+            nodeToDisable.verifyCanDisable(serviceSet);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/909c0dec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
index 701adcf..d2f3833 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
@@ -66,8 +66,7 @@ public class StandardControllerServiceReference implements ControllerServiceRefe
             if (component instanceof ControllerServiceNode) {
                 serviceNodes.add((ControllerServiceNode) component);
 
-                final ControllerServiceState state = ((ControllerServiceNode) component).getState();
-                if (state != ControllerServiceState.DISABLED) {
+                if (((ControllerServiceNode) component).isActive()) {
                     activeReferences.add(component);
                 }
             } else if (isRunning(component)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/909c0dec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 5575a23..1f655de 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -16,18 +16,31 @@
  */
 package org.apache.nifi.controller.scheduling;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
 
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.Heartbeater;
+import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.StandardProcessorNode;
@@ -36,6 +49,7 @@ import org.apache.nifi.controller.reporting.StandardReportingInitializationConte
 import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
@@ -116,7 +130,13 @@ public class TestStandardProcessScheduler {
         Thread.sleep(1000L);
 
         scheduler.stopProcessor(procNode);
+        assertTrue(service.isActive());
+        assertTrue(service.getState() == ControllerServiceState.ENABLING);
         scheduler.disableControllerService(service);
+        assertTrue(service.getState() == ControllerServiceState.DISABLING);
+        assertFalse(service.isActive());
+        Thread.sleep(1000);
+        assertTrue(service.getState() == ControllerServiceState.DISABLED);
     }
 
 
@@ -169,4 +189,237 @@ public class TestStandardProcessScheduler {
             throw new IllegalStateException(e);
         }
     }
+    /**
+     * Validates the atomic nature of ControllerServiceNode.enable() method
+     * which must only trigger @OnEnabled once, regardless of how many threads
+     * may have a reference to the underlying ProcessScheduler and
+     * ControllerServiceNode.
+     */
+    @Test
+    public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception {
+        final ProcessScheduler scheduler = createScheduler();
+        StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
+        final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
+                "1", false);
+        assertFalse(serviceNode.isActive());
+        SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final AtomicBoolean asyncFailed = new AtomicBoolean();
+        for (int i = 0; i < 1000; i++) {
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        scheduler.enableControllerService(serviceNode);
+                        assertTrue(serviceNode.isActive());
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        asyncFailed.set(true);
+                    }
+                }
+            });
+        }
+        // need to sleep a while since we are emulating async invocations on
+        // method that is also internally async
+        Thread.sleep(500);
+        executor.shutdown();
+        assertFalse(asyncFailed.get());
+        assertEquals(1, ts.enableInvocationCount());
+    }
+
+    /**
+     * Validates the atomic nature of ControllerServiceNode.disable(..) method
+     * which must never trigger @OnDisabled, regardless of how many threads may
+     * have a reference to the underlying ProcessScheduler and
+     * ControllerServiceNode.
+     */
+    @Test
+    public void validateDisabledServiceCantBeDisabled() throws Exception {
+        final ProcessScheduler scheduler = createScheduler();
+        StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
+        final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
+                "1", false);
+        SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final AtomicBoolean asyncFailed = new AtomicBoolean();
+        for (int i = 0; i < 1000; i++) {
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        scheduler.disableControllerService(serviceNode);
+                        assertFalse(serviceNode.isActive());
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        asyncFailed.set(true);
+                    }
+                }
+            });
+        }
+        // need to sleep a while since we are emulating async invocations on
+        // method that is also internally async
+        Thread.sleep(500);
+        executor.shutdown();
+        assertFalse(asyncFailed.get());
+        assertEquals(0, ts.disableInvocationCount());
+    }
+
+    /**
+     * Validates the atomic nature of ControllerServiceNode.disable() method
+     * which must only trigger @OnDisabled once, regardless of how many threads
+     * may have a reference to the underlying ProcessScheduler and
+     * ControllerServiceNode.
+     */
+    @Test
+    public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception {
+        final ProcessScheduler scheduler = createScheduler();
+        StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
+        final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
+                "1", false);
+        SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
+        scheduler.enableControllerService(serviceNode);
+        assertTrue(serviceNode.isActive());
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final AtomicBoolean asyncFailed = new AtomicBoolean();
+        for (int i = 0; i < 1000; i++) {
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        scheduler.disableControllerService(serviceNode);
+                        assertFalse(serviceNode.isActive());
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        asyncFailed.set(true);
+                    }
+                }
+            });
+        }
+        // need to sleep a while since we are emulating async invocations on
+        // method that is also internally async
+        Thread.sleep(500);
+        executor.shutdown();
+        assertFalse(asyncFailed.get());
+        assertEquals(1, ts.disableInvocationCount());
+    }
+
+    /**
+     * Validates that service that is infinitely blocking in @OnEnabled can
+     * still have DISABLE operation initiated. The service itself will be set to
+     * DISABLING state at which point UI and all will know that such service can
+     * not be transitioned any more into any other state until it finishes
+     * enabling (which will never happen in our case thus should be addressed by
+     * user). However, regardless of user's mistake NiFi will remain
+     * functioning.
+     */
+    @Test
+    public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception {
+        final ProcessScheduler scheduler = createScheduler();
+        StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
+        final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
+                "1", false);
+        LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
+        ts.setLimit(Long.MAX_VALUE);
+        scheduler.enableControllerService(serviceNode);
+        Thread.sleep(100);
+        assertTrue(serviceNode.isActive());
+        assertEquals(1, ts.enableInvocationCount());
+
+        Thread.sleep(1000);
+        scheduler.disableControllerService(serviceNode);
+        assertFalse(serviceNode.isActive());
+        assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
+        assertEquals(0, ts.disableInvocationCount());
+    }
+
+    /**
+     * Validates that the service that is currently in ENABLING state can be
+     * disabled and that its @OnDisabled operation will be invoked as soon
+     * as @OnEnable finishes.
+     */
+    @Test
+    public void validateLongEnablingServiceCanStillBeDisabled() throws Exception {
+        final ProcessScheduler scheduler = createScheduler();
+        StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
+        final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
+                "1", false);
+        LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
+        ts.setLimit(3000);
+        scheduler.enableControllerService(serviceNode);
+        Thread.sleep(100);
+        assertTrue(serviceNode.isActive());
+        assertEquals(1, ts.enableInvocationCount());
+
+        Thread.sleep(100);
+        scheduler.disableControllerService(serviceNode);
+        assertFalse(serviceNode.isActive());
+        assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
+        assertEquals(0, ts.disableInvocationCount());
+        // wait a bit. . . Enabling will finish and @OnDisabled will be invoked
+        // automatically
+        Thread.sleep(3000);
+        assertEquals(ControllerServiceState.DISABLED, serviceNode.getState());
+        assertEquals(1, ts.disableInvocationCount());
+    }
+
+    public static class SimpleTestService extends AbstractControllerService {
+
+        private final AtomicInteger enableCounter = new AtomicInteger();
+        private final AtomicInteger disableCounter = new AtomicInteger();
+
+        @OnEnabled
+        public void enable(ConfigurationContext context) {
+            this.enableCounter.incrementAndGet();
+        }
+
+        @OnDisabled
+        public void disable(ConfigurationContext context) {
+            this.disableCounter.incrementAndGet();
+        }
+
+        public int enableInvocationCount() {
+            return this.enableCounter.get();
+        }
+
+        public int disableInvocationCount() {
+            return this.disableCounter.get();
+        }
+    }
+
+    public static class LongEnablingService extends AbstractControllerService {
+        private final AtomicInteger enableCounter = new AtomicInteger();
+        private final AtomicInteger disableCounter = new AtomicInteger();
+
+        private volatile long limit;
+
+        @OnEnabled
+        public void enable(ConfigurationContext context) throws Exception {
+            this.enableCounter.incrementAndGet();
+            Thread.sleep(limit);
+        }
+
+        @OnDisabled
+        public void disable(ConfigurationContext context) {
+            this.disableCounter.incrementAndGet();
+        }
+
+        public int enableInvocationCount() {
+            return this.enableCounter.get();
+        }
+
+        public int disableInvocationCount() {
+            return this.disableCounter.get();
+        }
+
+        public void setLimit(long limit) {
+            this.limit = limit;
+        }
+    }
+
+    private ProcessScheduler createScheduler() {
+        return new StandardProcessScheduler(mock(Heartbeater.class), null, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/909c0dec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index 5cd3648..2400009 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -120,7 +120,7 @@ public class TestStandardControllerServiceProvider {
     @Test(timeout = 10000)
     public void testConcurrencyWithEnablingReferencingServicesGraph() {
         final ProcessScheduler scheduler = createScheduler();
-        for (int i = 0; i < 1000; i++) {
+        for (int i = 0; i < 10000; i++) {
             testEnableReferencingServicesGraph(scheduler);
         }
     }


[4/5] nifi git commit: Merge branch 'NIFI-1164' of https://github.com/olegz/nifi into NIFI-1164

Posted by ma...@apache.org.
Merge branch 'NIFI-1164' of https://github.com/olegz/nifi into NIFI-1164


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/71544cd2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/71544cd2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/71544cd2

Branch: refs/heads/master
Commit: 71544cd22b0e1269bc3f3afa672da3897a6176ed
Parents: 03730ad 602c4a9
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 31 15:22:09 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 31 15:22:09 2015 -0500

----------------------------------------------------------------------
 .../service/ControllerServiceNode.java          |  40 ++-
 .../scheduling/StandardProcessScheduler.java    | 151 +--------
 .../service/StandardControllerServiceNode.java  | 139 +++++++-
 .../StandardControllerServiceProvider.java      |  35 +-
 .../StandardControllerServiceReference.java     |   3 +-
 .../TestStandardProcessScheduler.java           | 338 +++++++++++++++++++
 .../TestStandardControllerServiceProvider.java  |   2 +-
 7 files changed, 530 insertions(+), 178 deletions(-)
----------------------------------------------------------------------



[5/5] nifi git commit: NIFI-1164: Fixed contrib-check issues, log message

Posted by ma...@apache.org.
NIFI-1164: Fixed contrib-check issues, log message


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8e031c98
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8e031c98
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8e031c98

Branch: refs/heads/master
Commit: 8e031c987bc0781f02e1ca90b37ffc379f835eeb
Parents: 71544cd
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 31 15:37:00 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 31 15:37:00 2015 -0500

----------------------------------------------------------------------
 .../controller/service/StandardControllerServiceNode.java    | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8e031c98/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 6062db7..ba03ee3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -292,11 +292,15 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
                             stateRef.set(ControllerServiceState.DISABLED);
                         }
                     } catch (Exception e) {
+                        final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
+                        final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
+                        componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
+                        LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
                         invokeDisable(configContext, heartbeater);
+
                         if (isActive()) {
                             scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
-                        }
-                        else {
+                        } else {
                             ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
                             stateRef.set(ControllerServiceState.DISABLED);
                         }


[3/5] nifi git commit: NIFI-1164 addressed latest PR comments

Posted by ma...@apache.org.
NIFI-1164 addressed latest PR comments


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/602c4a96
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/602c4a96
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/602c4a96

Branch: refs/heads/master
Commit: 602c4a9639af39d407976b3840bd842e5b665b2c
Parents: 0d09054
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Mon Dec 28 16:26:00 2015 -0500
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Dec 28 16:26:00 2015 -0500

----------------------------------------------------------------------
 .../controller/service/ControllerServiceNode.java     |  6 ++----
 .../service/StandardControllerServiceNode.java        | 14 ++++----------
 2 files changed, 6 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/602c4a96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 7dd57d3..4f1a961 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -66,9 +66,8 @@ public interface ControllerServiceNode extends ConfiguredComponent {
      *            the amount of milliseconds to wait for administrative yield
      * @param heartbeater
      *            the instance of {@link Heartbeater}
-     * @return 'true' if service was enabled
      */
-    boolean enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater);
+    void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater);
 
     /**
      * Will disable this service. Disabling of the service typically means
@@ -79,9 +78,8 @@ public interface ControllerServiceNode extends ConfiguredComponent {
      *            initiate service disabling task
      * @param heartbeater
      *            the instance of {@link Heartbeater}
-     * @return 'true' if service was disabled
      */
-    boolean disable(ScheduledExecutorService scheduler, Heartbeater heartbeater);
+    void disable(ScheduledExecutorService scheduler, Heartbeater heartbeater);
 
     /**
      * @return the ControllerServiceReference that describes which components are referencing this Controller Service

http://git-wip-us.apache.org/repos/asf/nifi/blob/602c4a96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index a0fa200..6062db7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -268,7 +268,8 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
      * as it reached ENABLED state.
      */
     @Override
-    public boolean enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis, final Heartbeater heartbeater) {
+    public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis,
+            final Heartbeater heartbeater) {
         if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) {
             this.active.set(true);
             final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
@@ -278,7 +279,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
                     try {
                         ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
                         boolean shouldEnable = false;
-                        synchronized (configContext) {
+                        synchronized (active) {
                             shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED);
                         }
                         if (shouldEnable) {
@@ -291,10 +292,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
                             stateRef.set(ControllerServiceState.DISABLED);
                         }
                     } catch (Exception e) {
-                        final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
-                        final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
-                        componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
-                        LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
                         invokeDisable(configContext, heartbeater);
                         if (isActive()) {
                             scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
@@ -307,7 +304,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
                 }
             });
         }
-        return this.active.get();
     }
 
     /**
@@ -325,7 +321,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
      * DISABLED state.
      */
     @Override
-    public boolean disable(final ScheduledExecutorService scheduler, final Heartbeater heartbeater) {
+    public void disable(ScheduledExecutorService scheduler, final Heartbeater heartbeater) {
         /*
          * The reason for synchronization is to ensure consistency of the
          * service state when another thread is in the middle of enabling this
@@ -352,7 +348,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
         } else {
             this.stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.DISABLING);
         }
-        return !this.active.get();
     }
 
     /**
@@ -366,7 +361,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
             final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
             componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
             LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
-            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
         }
     }
 }


[2/5] nifi git commit: NIFI-1164 addressed PR comment Added isActive check to the StandardControllerServiceNode:280 to ensure that the IF statement can only have a chance to succeed if service is active. The service will be indiscriminately deactivated a

Posted by ma...@apache.org.
NIFI-1164 addressed PR comment
Added isActive check to the StandardControllerServiceNode:280 to ensure that
the IF statement can only have a chance to succeed if service is active. The service
will be indiscriminately deactivated as soon as disable(..) operation is invoked. This itself will
eliminate the race condition discovered by Mark

NIFI-1164 addressed PR comments
fixed the race condition described by Mark during disable call

NIFI-1164 polished javadoc


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0d09054d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0d09054d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0d09054d

Branch: refs/heads/master
Commit: 0d09054d9f40f433c156712421c18cde313b1e07
Parents: 909c0de
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Mon Dec 28 11:11:58 2015 -0500
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Dec 28 15:16:32 2015 -0500

----------------------------------------------------------------------
 .../service/ControllerServiceNode.java          |  9 +-
 .../service/StandardControllerServiceNode.java  | 53 ++++++++----
 .../TestStandardProcessScheduler.java           | 89 +++++++++++++++++++-
 3 files changed, 130 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0d09054d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index b229af0..7dd57d3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -66,8 +66,9 @@ public interface ControllerServiceNode extends ConfiguredComponent {
      *            the amount of milliseconds to wait for administrative yield
      * @param heartbeater
      *            the instance of {@link Heartbeater}
+     * @return 'true' if service was enabled
      */
-    void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater);
+    boolean enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater);
 
     /**
      * Will disable this service. Disabling of the service typically means
@@ -78,8 +79,9 @@ public interface ControllerServiceNode extends ConfiguredComponent {
      *            initiate service disabling task
      * @param heartbeater
      *            the instance of {@link Heartbeater}
+     * @return 'true' if service was disabled
      */
-    void disable(ScheduledExecutorService scheduler, Heartbeater heartbeater);
+    boolean disable(ScheduledExecutorService scheduler, Heartbeater heartbeater);
 
     /**
      * @return the ControllerServiceReference that describes which components are referencing this Controller Service
@@ -140,8 +142,7 @@ public interface ControllerServiceNode extends ConfiguredComponent {
      * {@link #enable(ScheduledExecutorService, long, Heartbeater)} operation
      * has been invoked and the service has been transitioned to ENABLING state.
      * The service will also remain 'active' after its been transitioned to
-     * ENABLED state. 
-     * <br>
+     * ENABLED state. <br>
      * The service will be de-activated upon invocation of
      * {@link #disable(ScheduledExecutorService, Heartbeater)}.
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d09054d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index ce4a767..a0fa200 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -268,8 +268,8 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
      * as it reached ENABLED state.
      */
     @Override
-    public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis, final Heartbeater heartbeater) {
-        if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)){
+    public boolean enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis, final Heartbeater heartbeater) {
+        if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) {
             this.active.set(true);
             final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
             scheduler.execute(new Runnable() {
@@ -277,13 +277,17 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
                 public void run() {
                     try {
                         ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
-                        if (stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED)) {
+                        boolean shouldEnable = false;
+                        synchronized (configContext) {
+                            shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED);
+                        }
+                        if (shouldEnable) {
                             heartbeater.heartbeat();
                         } else {
                             LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated.");
                             // Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be
                             // set to DISABLING (see disable() operation)
-                            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
+                            invokeDisable(configContext, heartbeater);
                             stateRef.set(ControllerServiceState.DISABLED);
                         }
                     } catch (Exception e) {
@@ -291,7 +295,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
                         final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
                         componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
                         LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
-                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
+                        invokeDisable(configContext, heartbeater);
                         if (isActive()) {
                             scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
                         }
@@ -303,6 +307,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
                 }
             });
         }
+        return this.active.get();
     }
 
     /**
@@ -320,22 +325,24 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
      * DISABLED state.
      */
     @Override
-    public void disable(final ScheduledExecutorService scheduler, final Heartbeater heartbeater) {
-        this.active.set(false); // de-activating regardless of CAS operation
-                                // that follows since this operation will always result in service state being DISABLING
+    public boolean disable(final ScheduledExecutorService scheduler, final Heartbeater heartbeater) {
+        /*
+         * The reason for synchronization is to ensure consistency of the
+         * service state when another thread is in the middle of enabling this
+         * service since it will attempt to transition service state from
+         * ENABLING to ENABLED but only if it's active.
+         */
+        synchronized (this.active) {
+            this.active.set(false);
+        }
+
         if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) {
             final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
             scheduler.execute(new Runnable() {
                 @Override
                 public void run() {
                     try {
-                        ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext);
-                    } catch (Exception e) {
-                        final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
-                        final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
-                        componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
-                        LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
-                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
+                        invokeDisable(configContext, heartbeater);
                     } finally {
                         stateRef.set(ControllerServiceState.DISABLED);
                         heartbeater.heartbeat();
@@ -345,5 +352,21 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
         } else {
             this.stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.DISABLING);
         }
+        return !this.active.get();
+    }
+
+    /**
+     *
+     */
+    private void invokeDisable(ConfigurationContext configContext, Heartbeater heartbeater) {
+        try {
+            ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext);
+        } catch (Exception e) {
+            final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
+            final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
+            componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
+            LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d09054d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 1f655de..28dd298 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -24,14 +24,13 @@ import static org.mockito.Mockito.mock;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.LockSupport;
 
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
@@ -50,6 +49,7 @@ import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.controller.service.StandardControllerServiceNode;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
@@ -306,6 +306,71 @@ public class TestStandardProcessScheduler {
         assertEquals(1, ts.disableInvocationCount());
     }
 
+    @Test
+    public void validateDisablingOfTheFailedService() throws Exception {
+        final ProcessScheduler scheduler = createScheduler();
+        StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
+        final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(),
+                "1", false);
+        scheduler.enableControllerService(serviceNode);
+        Thread.sleep(1000);
+        scheduler.shutdown();
+        /*
+         * Because it was never disabled it will remain active since its
+         * enabling is being retried. This may actually be a bug in the
+         * scheduler since it probably has to shut down all components (disable
+         * services, shut down processors etc) before shutting down itself
+         */
+        assertTrue(serviceNode.isActive());
+        assertTrue(serviceNode.getState() == ControllerServiceState.ENABLING);
+    }
+
+    /**
+     * Validates that in multi threaded environment enabling service can still
+     * be disabled. This test is set up in such way that disabling of the
+     * service could be initiated by both disable and enable methods. In other
+     * words it tests two conditions in
+     * {@link StandardControllerServiceNode#disable(java.util.concurrent.ScheduledExecutorService, Heartbeater)}
+     * where the disabling of the service can be initiated right there (if
+     * ENABLED), or if service is still enabling its disabling will be deferred
+     * to the logic in
+     * {@link StandardControllerServiceNode#enable(java.util.concurrent.ScheduledExecutorService, long, Heartbeater)}
+     * IN any even the resulting state of the service is DISABLED
+     */
+    @Test
+    public void validateEnabledDisableMultiThread() throws Exception {
+        final ProcessScheduler scheduler = createScheduler();
+        StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
+        ExecutorService executor = Executors.newCachedThreadPool();
+        for (int i = 0; i < 200; i++) {
+            final ControllerServiceNode serviceNode = provider
+                    .createControllerService(RandomShortDelayEnablingService.class.getName(), "1", false);
+
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    scheduler.enableControllerService(serviceNode);
+                }
+            });
+            Thread.sleep(2); // ensure that enable gets initiated before disable
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    scheduler.disableControllerService(serviceNode);
+                }
+            });
+            Thread.sleep(25);
+            assertFalse(serviceNode.isActive());
+            assertTrue(serviceNode.getState() == ControllerServiceState.DISABLED);
+        }
+
+        // need to sleep a while since we are emulating async invocations on
+        // method that is also internally async
+        Thread.sleep(500);
+        executor.shutdown();
+        executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+    }
+
     /**
      * Validates that service that is infinitely blocking in @OnEnabled can
      * still have DISABLE operation initiated. The service itself will be set to
@@ -365,6 +430,26 @@ public class TestStandardProcessScheduler {
         assertEquals(1, ts.disableInvocationCount());
     }
 
+    public static class FailingService extends AbstractControllerService {
+        @OnEnabled
+        public void enable(ConfigurationContext context) {
+            throw new RuntimeException("intentional");
+        }
+    }
+
+    public static class RandomShortDelayEnablingService extends AbstractControllerService {
+        private final Random random = new Random();
+
+        @OnEnabled
+        public void enable(ConfigurationContext context) {
+            try {
+                Thread.sleep(random.nextInt(20));
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
     public static class SimpleTestService extends AbstractControllerService {
 
         private final AtomicInteger enableCounter = new AtomicInteger();