You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/10/15 19:22:18 UTC

[2/2] nifi git commit: NIFI-5686: Updated StandardProcessScheduler so that if it fails to schedule a Reporting Task, it re-schedules the @OnScheduled task instead of looping and calling Thread.sleep. As it was, the single-threaded Process Scheduler was,

NIFI-5686: Updated StandardProcessScheduler so that if it fails to schedule a Reporting Task, it re-schedules the @OnScheduled task instead of looping and calling Thread.sleep. As it was, the single-threaded Process Scheduler was, when calling ProcessScheduler.unschedule(), the unschedule task was not executing because the schedule task was using the only thread. But switching the logic to schedule the task for later and return, instead of calling Thread.sleep and looping, we are able to avoid blocking the one thread in the thread pool. Also, performed some trivial code cleanup and updated erroneous links in Java-docs.

NIFI-5686: Fixed unit test in TestSocketLoadBalancedFlowFileQueue; renamed TestProcessorLifecycle to ProcessorLifecycleIT as it is testing integration between many components and largely focuses on high numbers of concurrent tasks to see if it can trigger any threading bugs that may get introduced

NIFI-5686: Extended unit test timeouts
Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #3062


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/32db43b3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/32db43b3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/32db43b3

Branch: refs/heads/master
Commit: 32db43b3069b3c982b9c96576c11b447c229b360
Parents: 218063a
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Oct 11 11:45:41 2018 -0400
Committer: Matthew Burgess <ma...@apache.org>
Committed: Mon Oct 15 15:21:05 2018 -0400

----------------------------------------------------------------------
 .../scheduling/StandardProcessScheduler.java    | 110 ++-
 .../TestSocketLoadBalancedFlowFileQueue.java    |  20 +-
 .../scheduling/ProcessorLifecycleIT.java        | 868 +++++++++++++++++++
 .../scheduling/TestProcessorLifecycle.java      | 868 -------------------
 .../TestStandardProcessScheduler.java           |  64 +-
 .../standard/TestHandleHttpRequest.java         |  69 +-
 6 files changed, 996 insertions(+), 1003 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/32db43b3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index b23e763..0459372 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -16,19 +16,6 @@
  */
 package org.apache.nifi.controller.scheduling;
 
-import static java.util.Objects.requireNonNull;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -53,6 +40,7 @@ import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardProcessContext;
@@ -64,9 +52,21 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
 /**
- * Responsible for scheduling Processors, Ports, and Funnels to run at regular
- * intervals
+ * Responsible for scheduling Processors, Ports, and Funnels to run at regular intervals
  */
 public final class StandardProcessScheduler implements ProcessScheduler {
 
@@ -196,44 +196,39 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                 final long lastStopTime = lifecycleState.getLastStopTime();
                 final ReportingTask reportingTask = taskNode.getReportingTask();
 
-                // Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time.
-                while (true) {
-                    try {
-                        synchronized (lifecycleState) {
-                            // if no longer scheduled to run, then we're finished. This can happen, for example,
-                            // if the @OnScheduled method throws an Exception and the user stops the reporting task
-                            // while we're administratively yielded.
-                            // we also check if the schedule state's last start time is equal to what it was before.
-                            // if not, then means that the reporting task has been stopped and started again, so we should just
-                            // bail; another thread will be responsible for invoking the @OnScheduled methods.
-                            if (!lifecycleState.isScheduled() || lifecycleState.getLastStopTime() != lastStopTime) {
-                                return;
-                            }
-
-                            try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) {
-                                ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
-                            }
-
-                            agent.schedule(taskNode, lifecycleState);
+                // Attempt to start the Reporting Task, and if we fail re-schedule the task again after #administrativeYielMillis milliseconds
+                try {
+                    synchronized (lifecycleState) {
+                        // if no longer scheduled to run, then we're finished. This can happen, for example,
+                        // if the @OnScheduled method throws an Exception and the user stops the reporting task
+                        // while we're administratively yielded.
+                        // we also check if the schedule state's last start time is equal to what it was before.
+                        // if not, then means that the reporting task has been stopped and started again, so we should just
+                        // bail; another thread will be responsible for invoking the @OnScheduled methods.
+                        if (!lifecycleState.isScheduled() || lifecycleState.getLastStopTime() != lastStopTime) {
+                            LOG.debug("Did not complete invocation of @OnScheduled task for {} but Lifecycle State is no longer scheduled. Will not attempt to invoke task anymore", reportingTask);
                             return;
                         }
-                    } catch (final Exception e) {
-                        final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
-                        final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
-                        componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
-
-                        LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this "
-                                + "ReportingTask and will attempt to schedule it again after {}",
-                                new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
 
-                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, taskNode.getConfigurationContext());
-                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, taskNode.getConfigurationContext());
-
-                        try {
-                            Thread.sleep(administrativeYieldMillis);
-                        } catch (final InterruptedException ie) {
+                        try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) {
+                            ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
                         }
+
+                        agent.schedule(taskNode, lifecycleState);
                     }
+                } catch (final Exception e) {
+                    final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
+                    final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
+                    componentLog.error("Failed to invoke @OnScheduled method due to {}", cause);
+
+                    LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this "
+                            + "ReportingTask and will attempt to schedule it again after {}",
+                            new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
+
+                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, taskNode.getConfigurationContext());
+                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, taskNode.getConfigurationContext());
+
+                    componentLifeCycleThreadPool.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
                 }
             }
         };
@@ -262,10 +257,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                 synchronized (lifecycleState) {
                     lifecycleState.setScheduled(false);
 
-                    try {
-                        try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) {
-                            ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext);
-                        }
+                    try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) {
+                        ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext);
                     } catch (final Exception e) {
                         final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
                         final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
@@ -274,11 +267,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                         LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
                                 reportingTask, cause.toString(), administrativeYieldDuration);
                         LOG.error("", cause);
-
-                        try {
-                            Thread.sleep(administrativeYieldMillis);
-                        } catch (final InterruptedException ie) {
-                        }
                     }
 
                     agent.unschedule(taskNode, lifecycleState);
@@ -295,10 +283,10 @@ public final class StandardProcessScheduler implements ProcessScheduler {
 
     /**
      * Starts the given {@link Processor} by invoking its
-     * {@link ProcessorNode#start(ScheduledExecutorService, long, org.apache.nifi.processor.ProcessContext, Runnable)}
+     * {@link ProcessorNode#start(ScheduledExecutorService, long, ProcessContext, SchedulingAgentCallback, boolean)}
      * method.
      *
-     * @see StandardProcessorNode#start(ScheduledExecutorService, long, org.apache.nifi.processor.ProcessContext, Runnable)
+     * @see StandardProcessorNode#start(ScheduledExecutorService, long, ProcessContext, SchedulingAgentCallback, boolean)
      */
     @Override
     public synchronized CompletableFuture<Void> startProcessor(final ProcessorNode procNode, final boolean failIfStopping) {
@@ -335,10 +323,10 @@ public final class StandardProcessScheduler implements ProcessScheduler {
 
     /**
      * Stops the given {@link Processor} by invoking its
-     * {@link ProcessorNode#stop(ScheduledExecutorService, org.apache.nifi.processor.ProcessContext, SchedulingAgent, LifecycleState)}
+     * {@link ProcessorNode#stop(ProcessScheduler, ScheduledExecutorService, ProcessContext, SchedulingAgent, LifecycleState)}
      * method.
      *
-     * @see StandardProcessorNode#stop(ScheduledExecutorService, org.apache.nifi.processor.ProcessContext, SchedulingAgent, LifecycleState)
+     * @see StandardProcessorNode#stop(ProcessScheduler, ScheduledExecutorService, ProcessContext, SchedulingAgent, LifecycleState)
      */
     @Override
     public synchronized CompletableFuture<Void> stopProcessor(final ProcessorNode procNode) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/32db43b3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
index 971770a..bb1ad49 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
@@ -96,12 +96,12 @@ public class TestSocketLoadBalancedFlowFileQueue {
         swapManager = new MockSwapManager();
         eventReporter = EventReporter.NO_OP;
 
-        final NodeIdentifier localNodeIdentifier = createNodeIdentifier();
+        final NodeIdentifier localNodeIdentifier = createNodeIdentifier("00000000-0000-0000-0000-000000000000");
 
         nodeIds = new ArrayList<>();
         nodeIds.add(localNodeIdentifier);
-        nodeIds.add(createNodeIdentifier());
-        nodeIds.add(createNodeIdentifier());
+        nodeIds.add(createNodeIdentifier("11111111-1111-1111-1111-111111111111"));
+        nodeIds.add(createNodeIdentifier("22222222-2222-2222-2222-222222222222"));
 
         Mockito.doAnswer(new Answer<Set<NodeIdentifier>>() {
             @Override
@@ -128,7 +128,11 @@ public class TestSocketLoadBalancedFlowFileQueue {
     }
 
     private NodeIdentifier createNodeIdentifier() {
-        return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", nodePort++, "localhost", nodePort++,
+        return createNodeIdentifier(UUID.randomUUID().toString());
+    }
+
+    private NodeIdentifier createNodeIdentifier(final String uuid) {
+        return new NodeIdentifier(uuid, "localhost", nodePort++, "localhost", nodePort++,
             "localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet());
     }
 
@@ -339,9 +343,12 @@ public class TestSocketLoadBalancedFlowFileQueue {
         }
     }
 
-    @Test(timeout = 10000)
+
+    @Test(timeout = 30000)
     public void testChangeInClusterTopologyTriggersRebalanceOnlyOnRemovedNodeIfNecessary() throws InterruptedException {
-        // Create partitioner that sends first 2 FlowFiles to Partition 0, next 2 to Partition 1, and then next 4 to Partition 3.
+        // Create partitioner that sends first 1 FlowFile to Partition 0, next to Partition 2, and then next 2 to Partition 2.
+        // Then, cycle back to partitions 0 and 1. This will result in partitions 0 & 1 getting 1 FlowFile each and Partition 2
+        // getting 2 FlowFiles. Then, when Partition 2 is removed, those 2 FlowFiles will be rebalanced to Partitions 0 and 1.
         queue.setFlowFilePartitioner(new StaticSequencePartitioner(new int[] {0, 1, 2, 2, 0, 1}, false));
 
         for (int i = 0; i < 4; i++) {
@@ -359,6 +366,7 @@ public class TestSocketLoadBalancedFlowFileQueue {
 
         final int[] expectedPartitionSizes = new int[] {2, 2};
         final int[] partitionSizes = new int[2];
+
         while (!Arrays.equals(expectedPartitionSizes, partitionSizes)) {
             Thread.sleep(10L);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/32db43b3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
new file mode 100644
index 0000000..b8fed54
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
@@ -0,0 +1,868 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.scheduling;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.events.VolatileBulletinRepository;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.SystemBundle;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.MockProvenanceRepository;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Validate Processor's life-cycle operation within the context of
+ * {@link FlowController} and {@link StandardProcessScheduler}
+ */
+public class ProcessorLifecycleIT {
+
+    private static final Logger logger = LoggerFactory.getLogger(ProcessorLifecycleIT.class);
+    private static final long SHORT_DELAY_TOLERANCE = 10000L;
+    private static final long MEDIUM_DELAY_TOLERANCE = 15000L;
+    private static final long LONG_DELAY_TOLERANCE = 20000L;
+
+    private FlowController fc;
+    private Map<String, String> properties = new HashMap<>();
+    private volatile String propsFile = "src/test/resources/lifecycletest.nifi.properties";
+
+    @Before
+    public void before() throws Exception {
+        properties.put("P", "hello");
+    }
+
+    @After
+    public void after() throws Exception {
+        fc.shutdown(true);
+        FileUtils.deleteDirectory(new File("./target/lifecycletest"));
+    }
+
+    private void assertCondition(final Supplier<Boolean> supplier) {
+        assertCondition(supplier, SHORT_DELAY_TOLERANCE);
+    }
+
+    private void assertCondition(final Supplier<Boolean> supplier, final long delayToleranceMillis) {
+        final long startTime = System.currentTimeMillis();
+        while (((System.currentTimeMillis() - startTime) < delayToleranceMillis) && !supplier.get()) {
+            try {
+                Thread.sleep(50);
+            } catch (InterruptedException ex) {
+                Thread.interrupted();
+                break;
+            }
+        }
+        assertTrue(supplier.get());
+    }
+
+    @Test
+    public void validateEnableOperation() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
+                UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState());
+        // validates idempotency
+        for (int i = 0; i < 2; i++) {
+            testProcNode.enable();
+        }
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState());
+        testProcNode.disable();
+        assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
+    }
+
+    @Test
+    public void validateDisableOperation() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
+                UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState());
+        // validates idempotency
+        for (int i = 0; i < 2; i++) {
+            testProcNode.disable();
+        }
+        assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
+
+        ProcessScheduler ps = fc.getProcessScheduler();
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
+    }
+
+    /**
+     * Will validate the idempotent nature of processor start operation which
+     * can be called multiple times without any side-effects.
+     */
+    @Test
+    public void validateIdempotencyOfProcessorStartOperation() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        this.noop(testProcessor);
+        final ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        ps.startProcessor(testProcNode, true);
+        ps.startProcessor(testProcNode, true);
+
+        Thread.sleep(500);
+        assertCondition(() -> testProcessor.operationNames.size() == 1);
+        assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
+    }
+
+    /**
+     * Validates that stop calls are harmless and idempotent if processor is not
+     * in STARTING or RUNNING state.
+     */
+    @Test
+    public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
+        // sets the scenario for the processor to run
+        int randomDelayLimit = 3000;
+        this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
+        final ProcessScheduler ps = fc.getProcessScheduler();
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
+        assertTrue(testProcessor.operationNames.isEmpty());
+    }
+
+    /**
+     * Validates the processors start/stop sequence where the order of
+     * operations can only be @OnScheduled, @OnUnscheduled, @OnStopped.
+     */
+    @Test
+    @Ignore
+    public void validateSuccessfulAndOrderlyShutdown() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        int randomDelayLimit = 3000;
+        this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
+
+        testProcNode.setMaxConcurrentTasks(4);
+        testProcNode.setScheduldingPeriod("500 millis");
+        testProcNode.setAutoTerminatedRelationships(Collections.singleton(new Relationship.Builder().name("success").build()));
+
+        testGroup.addProcessor(testProcNode);
+
+        fc.startProcessGroup(testGroup.getIdentifier());
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
+
+        fc.stopAllProcessors();
+
+        Thread.sleep(randomDelayLimit); // up to randomDelayLimit, otherwise next assertion may fail as the processor still executing
+
+        // validates that regardless of how many running tasks, lifecycle
+        // operation are invoked atomically (once each).
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
+        // . . . hence only 3 operations must be in the list
+        assertCondition(() -> testProcessor.operationNames.size() == 3, SHORT_DELAY_TOLERANCE);
+        // . . . and ordered as @OnScheduled, @OnUnscheduled, @OnStopped
+        assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
+        assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
+        assertEquals("@OnStopped", testProcessor.operationNames.get(2));
+    }
+
+    /**
+     * Concurrency test that is basically hammers on both stop and start
+     * operation validating their idempotency.
+     */
+    @Test
+    @Ignore
+    public void validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        this.noop(testProcessor);
+
+        final ProcessScheduler ps = fc.getProcessScheduler();
+        ExecutorService executor = Executors.newFixedThreadPool(100);
+        int startCallsCount = 10000;
+        final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
+        final Random random = new Random();
+        for (int i = 0; i < startCallsCount / 2; i++) {
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    LockSupport.parkNanos(random.nextInt(9000000));
+                    ps.stopProcessor(testProcNode);
+                    countDownCounter.countDown();
+                }
+            });
+        }
+        for (int i = 0; i < startCallsCount / 2; i++) {
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    LockSupport.parkNanos(random.nextInt(9000000));
+                    ps.startProcessor(testProcNode, true);
+                    countDownCounter.countDown();
+                }
+            });
+        }
+        assertTrue(countDownCounter.await(1000000, TimeUnit.MILLISECONDS));
+        String previousOperation = null;
+        for (String operationName : testProcessor.operationNames) {
+            if (previousOperation == null || previousOperation.equals("@OnStopped")) {
+                assertEquals("@OnScheduled", operationName);
+            } else if (previousOperation.equals("@OnScheduled")) {
+                assertEquals("@OnUnscheduled", operationName);
+            } else if (previousOperation.equals("@OnUnscheduled")) {
+                assertTrue(operationName.equals("@OnStopped") || operationName.equals("@OnScheduled"));
+            }
+            previousOperation = operationName;
+        }
+        executor.shutdownNow();
+    }
+
+    /**
+     * Validates that processor can be stopped before start sequence finished.
+     */
+    @Test
+    public void validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        int delay = 200;
+        this.longRunningOnSchedule(testProcessor, delay);
+        ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
+
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
+        assertCondition(() -> testProcessor.operationNames.size() == 3, LONG_DELAY_TOLERANCE);
+        assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
+        assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
+        assertEquals("@OnStopped", testProcessor.operationNames.get(2));
+    }
+
+    /**
+     * Validates that Processor is eventually started once invocation of
+     *
+     * @OnSchedule stopped throwing exceptions.
+     */
+    @Test
+    public void validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        this.noop(testProcessor);
+        testProcessor.generateExceptionOnScheduled = true;
+        testProcessor.keepFailingOnScheduledTimes = 2;
+        ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
+    }
+
+    /**
+     * Validates that Processor can be stopped when @OnScheduled constantly
+     * fails. Basically validates that the re-try loop breaks if user initiated
+     * stopProcessor.
+     */
+    @Test
+    public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        this.longRunningOnUnschedule(testProcessor, 100);
+        testProcessor.generateExceptionOnScheduled = true;
+        testProcessor.keepFailingOnScheduledTimes = Integer.MAX_VALUE;
+        ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
+    }
+
+    /**
+     * Validates that the Processor can be stopped when @OnScheduled blocks
+     * indefinitely but written to react to thread interrupts
+     */
+    @Test
+    public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
+        // sets the scenario for the processor to run
+        this.blockingInterruptableOnUnschedule(testProcessor);
+        ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
+    }
+
+    /**
+     * Validates that the Processor can be stopped when @OnScheduled blocks
+     * indefinitely and written to ignore thread interrupts
+     */
+    @Test
+    public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "1 sec");
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
+        // sets the scenario for the processor to run
+        this.blockingUninterruptableOnUnschedule(testProcessor);
+        ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
+    }
+
+    /**
+     * Validates that processor can be stopped if onTrigger() keeps throwing
+     * exceptions.
+     */
+    @Test
+    public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        this.noop(testProcessor);
+        testProcessor.generateExceptionOnTrigger = true;
+        ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
+        ps.disableProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
+    }
+
+    /**
+     * Validate that processor will not be validated on failing
+     * PropertyDescriptor validation.
+     */
+    @Test(expected = IllegalStateException.class)
+    public void validateStartFailsOnInvalidProcessorWithMissingProperty() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        ProcessScheduler ps = fc.getProcessScheduler();
+        ps.startProcessor(testProcNode, true);
+        fail();
+    }
+
+    /**
+     * Validate that processor will not be validated on failing
+     * ControllerService validation (not enabled).
+     */
+    @Test(expected = IllegalStateException.class)
+    public void validateStartFailsOnInvalidProcessorWithDisabledService() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+
+        ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "serv",
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true);
+        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+
+        properties.put("S", testServiceNode.getIdentifier());
+        testProcNode.setProperties(properties);
+
+        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
+        testProcessor.withService = true;
+
+        ProcessScheduler ps = fc.getProcessScheduler();
+        ps.startProcessor(testProcNode, true);
+        fail();
+    }
+
+    /**
+     * The successful processor start with ControllerService dependency.
+     */
+    @Test
+    public void validateStartSucceedsOnProcessorWithEnabledService() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+
+        ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "foo",
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true);
+        testGroup.addControllerService(testServiceNode);
+
+        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testGroup.addProcessor(testProcNode);
+
+        properties.put("S", testServiceNode.getIdentifier());
+        testProcNode.setProperties(properties);
+
+        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
+        testProcessor.withService = true;
+        this.noop(testProcessor);
+
+        ProcessScheduler ps = fc.getProcessScheduler();
+        testServiceNode.performValidation();
+        ps.enableControllerService(testServiceNode);
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+
+        Thread.sleep(500);
+        assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
+    }
+
+
+    /**
+     * Scenario where onTrigger() is executed with random delay limited to
+     * 'delayLimit', yet with guaranteed exit from onTrigger().
+     */
+    private void randomOnTriggerDelay(TestProcessor testProcessor, int delayLimit) {
+        EmptyRunnable emptyRunnable = new EmptyRunnable();
+        RandomOrFixedDelayedRunnable delayedRunnable = new RandomOrFixedDelayedRunnable(delayLimit, true);
+        testProcessor.setScenario(emptyRunnable, emptyRunnable, emptyRunnable, delayedRunnable);
+    }
+
+    /**
+     * Scenario where @OnSchedule is executed with delay limited to
+     * 'delayLimit'.
+     */
+    private void longRunningOnSchedule(TestProcessor testProcessor, int delayLimit) {
+        EmptyRunnable emptyRunnable = new EmptyRunnable();
+        RandomOrFixedDelayedRunnable delayedRunnable = new RandomOrFixedDelayedRunnable(delayLimit, false);
+        testProcessor.setScenario(delayedRunnable, emptyRunnable, emptyRunnable, emptyRunnable);
+    }
+
+    /**
+     * Scenario where @OnUnschedule is executed with delay limited to
+     * 'delayLimit'.
+     */
+    private void longRunningOnUnschedule(TestProcessor testProcessor, int delayLimit) {
+        EmptyRunnable emptyRunnable = new EmptyRunnable();
+        RandomOrFixedDelayedRunnable delayedRunnable = new RandomOrFixedDelayedRunnable(delayLimit, false);
+        testProcessor.setScenario(emptyRunnable, delayedRunnable, emptyRunnable, emptyRunnable);
+    }
+
+    /**
+     * Scenario where @OnSchedule blocks indefinitely yet interruptible.
+     */
+    private void blockingInterruptableOnUnschedule(TestProcessor testProcessor) {
+        EmptyRunnable emptyRunnable = new EmptyRunnable();
+        BlockingInterruptableRunnable blockingRunnable = new BlockingInterruptableRunnable();
+        testProcessor.setScenario(blockingRunnable, emptyRunnable, emptyRunnable, emptyRunnable);
+    }
+
+    /**
+     * Scenario where @OnSchedule blocks indefinitely and un-interruptible.
+     */
+    private void blockingUninterruptableOnUnschedule(TestProcessor testProcessor) {
+        EmptyRunnable emptyRunnable = new EmptyRunnable();
+        BlockingUninterruptableRunnable blockingRunnable = new BlockingUninterruptableRunnable();
+        testProcessor.setScenario(blockingRunnable, emptyRunnable, emptyRunnable, emptyRunnable);
+    }
+
+    /**
+     * Scenario where all tasks are no op.
+     */
+    private void noop(TestProcessor testProcessor) {
+        EmptyRunnable emptyRunnable = new EmptyRunnable();
+        testProcessor.setScenario(emptyRunnable, emptyRunnable, emptyRunnable, emptyRunnable);
+    }
+
+    private FlowControllerAndSystemBundle buildFlowControllerForTest(final String propKey, final String propValue) throws Exception {
+        final Map<String, String> addProps = new HashMap<>();
+        addProps.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec");
+        addProps.put(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml");
+        addProps.put(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider");
+        addProps.put(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, MockProvenanceRepository.class.getName());
+        addProps.put("nifi.remote.input.socket.port", "");
+        addProps.put("nifi.remote.input.secure", "");
+        if (propKey != null && propValue != null) {
+            addProps.put(propKey, propValue);
+        }
+        final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, addProps);
+
+        final Bundle systemBundle = SystemBundle.create(nifiProperties);
+        ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet());
+
+        final FlowController flowController = FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties,
+                mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(),
+            new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()),
+            mock(FlowRegistryClient.class));
+
+        return new FlowControllerAndSystemBundle(flowController, systemBundle);
+    }
+
+    private FlowControllerAndSystemBundle buildFlowControllerForTest() throws Exception {
+        return buildFlowControllerForTest(null, null);
+    }
+
+    /**
+     *
+     */
+    private void setControllerRootGroup(FlowController controller, ProcessGroup processGroup) {
+        try {
+            Method m = FlowController.class.getDeclaredMethod("setRootGroup", ProcessGroup.class);
+            m.setAccessible(true);
+            m.invoke(controller, processGroup);
+            controller.initializeFlow();
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to set root group", e);
+        }
+    }
+
+    private static class FlowControllerAndSystemBundle {
+
+        private final FlowController flowController;
+        private final Bundle systemBundle;
+
+        public FlowControllerAndSystemBundle(FlowController flowController, Bundle systemBundle) {
+            this.flowController = flowController;
+            this.systemBundle = systemBundle;
+        }
+
+        public FlowController getFlowController() {
+            return flowController;
+        }
+
+        public Bundle getSystemBundle() {
+            return systemBundle;
+        }
+    }
+
+    /**
+     */
+    public static class TestProcessor extends AbstractProcessor {
+        private static final Runnable NOP = () -> {};
+
+        private Runnable onScheduleCallback = NOP;
+        private Runnable onUnscheduleCallback = NOP;
+        private Runnable onStopCallback = NOP;
+        private Runnable onTriggerCallback = NOP;
+
+        private boolean generateExceptionOnScheduled;
+        private boolean generateExceptionOnTrigger;
+
+        private boolean withService;
+
+        private int keepFailingOnScheduledTimes;
+
+        private int onScheduledExceptionCount;
+
+        private final List<String> operationNames = new LinkedList<>();
+
+        void setScenario(Runnable onScheduleCallback, Runnable onUnscheduleCallback, Runnable onStopCallback,
+                Runnable onTriggerCallback) {
+            this.onScheduleCallback = onScheduleCallback;
+            this.onUnscheduleCallback = onUnscheduleCallback;
+            this.onStopCallback = onStopCallback;
+            this.onTriggerCallback = onTriggerCallback;
+        }
+
+        @OnScheduled
+        public void schedule(ProcessContext ctx) {
+            this.operationNames.add("@OnScheduled");
+            if (this.generateExceptionOnScheduled
+                    && this.onScheduledExceptionCount++ < this.keepFailingOnScheduledTimes) {
+                throw new RuntimeException("Intentional");
+            }
+            this.onScheduleCallback.run();
+        }
+
+        @OnUnscheduled
+        public void unschedule() {
+            this.operationNames.add("@OnUnscheduled");
+            this.onUnscheduleCallback.run();
+        }
+
+        @OnStopped
+        public void stop() {
+            this.operationNames.add("@OnStopped");
+            this.onStopCallback.run();
+        }
+
+        @Override
+        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+            PropertyDescriptor PROP = new PropertyDescriptor.Builder()
+                    .name("P")
+                    .description("Blah Blah")
+                    .required(true)
+                    .addValidator(new Validator() {
+                        @Override
+                        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+                            return new ValidationResult.Builder().subject(subject).input(value).valid(value != null && !value.isEmpty()).explanation(subject + " cannot be empty").build();
+                        }
+                    })
+                    .build();
+
+            PropertyDescriptor SERVICE = new PropertyDescriptor.Builder()
+                    .name("S")
+                    .description("Blah Blah")
+                    .required(true)
+                    .identifiesControllerService(ITestservice.class)
+                    .build();
+
+            return this.withService ? Arrays.asList(new PropertyDescriptor[]{PROP, SERVICE})
+                    : Arrays.asList(new PropertyDescriptor[]{PROP});
+        }
+
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+            if (this.generateExceptionOnTrigger) {
+                throw new RuntimeException("Intentional");
+            }
+            this.onTriggerCallback.run();
+        }
+    }
+
+    /**
+     */
+    public static class TestService extends AbstractControllerService implements ITestservice {
+
+    }
+
+    /**
+     */
+    public static interface ITestservice extends ControllerService {
+
+    }
+
+    /**
+     */
+    private static class EmptyRunnable implements Runnable {
+
+        @Override
+        public void run() {
+
+        }
+    }
+
+    /**
+     */
+    private static class BlockingInterruptableRunnable implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                Thread.sleep(Long.MAX_VALUE);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     */
+    private static class BlockingUninterruptableRunnable implements Runnable {
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    Thread.sleep(Long.MAX_VALUE);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    /**
+     */
+    private static class RandomOrFixedDelayedRunnable implements Runnable {
+
+        private final int delayLimit;
+        private final boolean randomDelay;
+
+        public RandomOrFixedDelayedRunnable(int delayLimit, boolean randomDelay) {
+            this.delayLimit = delayLimit;
+            this.randomDelay = randomDelay;
+        }
+        Random random = new Random();
+
+        @Override
+        public void run() {
+            try {
+                if (this.randomDelay) {
+                    Thread.sleep(random.nextInt(this.delayLimit));
+                } else {
+                    Thread.sleep(this.delayLimit);
+                }
+            } catch (InterruptedException e) {
+                logger.warn("Interrupted while sleeping");
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+}