You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/11/06 16:24:04 UTC

[3/9] nifi git commit: NIFI-5769: Refactored FlowController to use Composition over Inheritance - Ensure that when root group is set, that we register its ID in FlowManager

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
index fa164fc..55b015d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
@@ -33,6 +33,7 @@ import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.events.VolatileBulletinRepository;
@@ -57,7 +58,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -89,9 +89,10 @@ public class ProcessorLifecycleIT {
     private static final long MEDIUM_DELAY_TOLERANCE = 15000L;
     private static final long LONG_DELAY_TOLERANCE = 20000L;
 
-    private FlowController fc;
+    private FlowManager flowManager;
     private Map<String, String> properties = new HashMap<>();
     private volatile String propsFile = "src/test/resources/lifecycletest.nifi.properties";
+    private ProcessScheduler processScheduler;
 
     @Before
     public void before() throws Exception {
@@ -100,7 +101,11 @@ public class ProcessorLifecycleIT {
 
     @After
     public void after() throws Exception {
-        fc.shutdown(true);
+        if (processScheduler != null) {
+            processScheduler.shutdown();
+            processScheduler = null;
+        }
+
         FileUtils.deleteDirectory(new File("./target/lifecycletest"));
     }
 
@@ -123,11 +128,10 @@ public class ProcessorLifecycleIT {
 
     @Test
     public void validateEnableOperation() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        fc = fcsb.getFlowController();
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
-        final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        flowManager = fcsb.getFlowManager();
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+        final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(),
                 UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
 
         assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
@@ -145,12 +149,11 @@ public class ProcessorLifecycleIT {
 
     @Test
     public void validateDisableOperation() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        fc = fcsb.getFlowController();
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        flowManager = fcsb.getFlowManager();
 
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
-        final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+        final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(),
                 UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testProcNode.setProperties(properties);
         assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
@@ -162,9 +165,8 @@ public class ProcessorLifecycleIT {
         assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState());
         assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
 
-        ProcessScheduler ps = fc.getProcessScheduler();
         testProcNode.performValidation();
-        ps.startProcessor(testProcNode, true);
+        processScheduler.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
     }
 
@@ -174,24 +176,22 @@ public class ProcessorLifecycleIT {
      */
     @Test
     public void validateIdempotencyOfProcessorStartOperation() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        fc = fcsb.getFlowController();
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        flowManager = fcsb.getFlowManager();
 
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
-        final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+        final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testProcNode.setProperties(properties);
         TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
 
         // sets the scenario for the processor to run
         this.noop(testProcessor);
-        final ProcessScheduler ps = fc.getProcessScheduler();
 
         testProcNode.performValidation();
-        ps.startProcessor(testProcNode, true);
-        ps.startProcessor(testProcNode, true);
-        ps.startProcessor(testProcNode, true);
+        processScheduler.startProcessor(testProcNode, true);
+        processScheduler.startProcessor(testProcNode, true);
+        processScheduler.startProcessor(testProcNode, true);
 
         Thread.sleep(500);
         assertCondition(() -> testProcessor.operationNames.size() == 1);
@@ -204,11 +204,10 @@ public class ProcessorLifecycleIT {
      */
     @Test
     public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        fc = fcsb.getFlowController();
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
-        final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        flowManager = fcsb.getFlowManager();
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+        final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testProcNode.setProperties(properties);
         TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -216,8 +215,8 @@ public class ProcessorLifecycleIT {
         // sets the scenario for the processor to run
         int randomDelayLimit = 3000;
         this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
-        final ProcessScheduler ps = fc.getProcessScheduler();
-        ps.stopProcessor(testProcNode);
+
+        processScheduler.stopProcessor(testProcNode);
         assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
         assertTrue(testProcessor.operationNames.isEmpty());
     }
@@ -229,11 +228,10 @@ public class ProcessorLifecycleIT {
     @Test
     @Ignore
     public void validateSuccessfulAndOrderlyShutdown() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        fc = fcsb.getFlowController();
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
-        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        flowManager = fcsb.getFlowManager();
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+        ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testProcNode.setProperties(properties);
         TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -248,10 +246,10 @@ public class ProcessorLifecycleIT {
 
         testGroup.addProcessor(testProcNode);
 
-        fc.startProcessGroup(testGroup.getIdentifier());
+        flowManager.getGroup(testGroup.getIdentifier()).startProcessing();
         assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
 
-        fc.stopAllProcessors();
+        flowManager.getRootGroup().stopProcessing();
 
         Thread.sleep(randomDelayLimit); // up to randomDelayLimit, otherwise next assertion may fail as the processor still executing
 
@@ -273,12 +271,11 @@ public class ProcessorLifecycleIT {
     @Test
     @Ignore
     public void validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        fc = fcsb.getFlowController();
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        flowManager = fcsb.getFlowManager();
 
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
-        final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+        final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testProcNode.setProperties(properties);
         TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -286,7 +283,6 @@ public class ProcessorLifecycleIT {
         // sets the scenario for the processor to run
         this.noop(testProcessor);
 
-        final ProcessScheduler ps = fc.getProcessScheduler();
         ExecutorService executor = Executors.newFixedThreadPool(100);
         int startCallsCount = 10000;
         final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount);
@@ -297,7 +293,7 @@ public class ProcessorLifecycleIT {
                 @Override
                 public void run() {
                     LockSupport.parkNanos(random.nextInt(9000000));
-                    ps.stopProcessor(testProcNode);
+                    processScheduler.stopProcessor(testProcNode);
                     countDownCounter.countDown();
                 }
             });
@@ -307,7 +303,7 @@ public class ProcessorLifecycleIT {
                 @Override
                 public void run() {
                     LockSupport.parkNanos(random.nextInt(9000000));
-                    ps.startProcessor(testProcNode, true);
+                    processScheduler.startProcessor(testProcNode, true);
                     countDownCounter.countDown();
                 }
             });
@@ -332,12 +328,11 @@ public class ProcessorLifecycleIT {
      */
     @Test
     public void validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        fc = fcsb.getFlowController();
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        flowManager = fcsb.getFlowManager();
 
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
-        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+        ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testProcNode.setProperties(properties);
         TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -345,13 +340,12 @@ public class ProcessorLifecycleIT {
         // sets the scenario for the processor to run
         int delay = 200;
         this.longRunningOnSchedule(testProcessor, delay);
-        ProcessScheduler ps = fc.getProcessScheduler();
 
         testProcNode.performValidation();
-        ps.startProcessor(testProcNode, true);
+        processScheduler.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
 
-        ps.stopProcessor(testProcNode);
+        processScheduler.stopProcessor(testProcNode);
         assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
         assertCondition(() -> testProcessor.operationNames.size() == 3, LONG_DELAY_TOLERANCE);
         assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
@@ -366,12 +360,11 @@ public class ProcessorLifecycleIT {
      */
     @Test
     public void validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        fc = fcsb.getFlowController();
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        flowManager = fcsb.getFlowManager();
 
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
-        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+        ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testProcNode.setProperties(properties);
         TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -380,12 +373,11 @@ public class ProcessorLifecycleIT {
         this.noop(testProcessor);
         testProcessor.generateExceptionOnScheduled = true;
         testProcessor.keepFailingOnScheduledTimes = 2;
-        ProcessScheduler ps = fc.getProcessScheduler();
 
         testProcNode.performValidation();
-        ps.startProcessor(testProcNode, true);
+        processScheduler.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
-        ps.stopProcessor(testProcNode);
+        processScheduler.stopProcessor(testProcNode);
         assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
     }
 
@@ -396,12 +388,11 @@ public class ProcessorLifecycleIT {
      */
     @Test
     public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        fc = fcsb.getFlowController();
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        flowManager = fcsb.getFlowManager();
 
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
-        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+        ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testProcNode.setProperties(properties);
         TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -410,12 +401,11 @@ public class ProcessorLifecycleIT {
         this.longRunningOnUnschedule(testProcessor, 100);
         testProcessor.generateExceptionOnScheduled = true;
         testProcessor.keepFailingOnScheduledTimes = Integer.MAX_VALUE;
-        ProcessScheduler ps = fc.getProcessScheduler();
 
         testProcNode.performValidation();
-        ps.startProcessor(testProcNode, true);
+        processScheduler.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
-        ps.stopProcessor(testProcNode);
+        processScheduler.stopProcessor(testProcNode);
         assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
     }
 
@@ -425,23 +415,21 @@ public class ProcessorLifecycleIT {
      */
     @Test
     public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
-        fc = fcsb.getFlowController();
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
+        flowManager = fcsb.getFlowManager();
 
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
-        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+        ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testProcNode.setProperties(properties);
         TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
         // sets the scenario for the processor to run
         this.blockingInterruptableOnUnschedule(testProcessor);
-        ProcessScheduler ps = fc.getProcessScheduler();
 
         testProcNode.performValidation();
-        ps.startProcessor(testProcNode, true);
+        processScheduler.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
-        ps.stopProcessor(testProcNode);
+        processScheduler.stopProcessor(testProcNode);
         assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
     }
 
@@ -451,23 +439,21 @@ public class ProcessorLifecycleIT {
      */
     @Test
     public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "1 sec");
-        fc = fcsb.getFlowController();
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "1 sec");
+        flowManager = fcsb.getFlowManager();
 
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
-        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+        ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testProcNode.setProperties(properties);
         TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
         // sets the scenario for the processor to run
         this.blockingUninterruptableOnUnschedule(testProcessor);
-        ProcessScheduler ps = fc.getProcessScheduler();
 
         testProcNode.performValidation();
-        ps.startProcessor(testProcNode, true);
+        processScheduler.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
-        ps.stopProcessor(testProcNode);
+        processScheduler.stopProcessor(testProcNode);
         assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
     }
 
@@ -477,12 +463,11 @@ public class ProcessorLifecycleIT {
      */
     @Test
     public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        fc = fcsb.getFlowController();
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        flowManager = fcsb.getFlowManager();
 
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
-        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+        ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testProcNode.setProperties(properties);
         TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -490,14 +475,13 @@ public class ProcessorLifecycleIT {
         // sets the scenario for the processor to run
         this.noop(testProcessor);
         testProcessor.generateExceptionOnTrigger = true;
-        ProcessScheduler ps = fc.getProcessScheduler();
 
         testProcNode.performValidation();
-        ps.startProcessor(testProcNode, true);
+        processScheduler.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
-        ps.disableProcessor(testProcNode);
+        processScheduler.disableProcessor(testProcNode);
         assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
-        ps.stopProcessor(testProcNode);
+        processScheduler.stopProcessor(testProcNode);
         assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
     }
 
@@ -507,15 +491,13 @@ public class ProcessorLifecycleIT {
      */
     @Test(expected = IllegalStateException.class)
     public void validateStartFailsOnInvalidProcessorWithMissingProperty() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        fc = fcsb.getFlowController();
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        flowManager = fcsb.getFlowManager();
 
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
-        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+        ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
-        ProcessScheduler ps = fc.getProcessScheduler();
-        ps.startProcessor(testProcNode, true);
+        processScheduler.startProcessor(testProcNode, true);
         fail();
     }
 
@@ -525,15 +507,14 @@ public class ProcessorLifecycleIT {
      */
     @Test(expected = IllegalStateException.class)
     public void validateStartFailsOnInvalidProcessorWithDisabledService() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        fc = fcsb.getFlowController();
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        flowManager = fcsb.getFlowManager();
 
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
 
-        ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "serv",
-                fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true);
-        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+        ControllerServiceNode testServiceNode = flowManager.createControllerService(TestService.class.getName(), "serv",
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true, true);
+        ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
 
         properties.put("S", testServiceNode.getIdentifier());
@@ -542,8 +523,7 @@ public class ProcessorLifecycleIT {
         TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
         testProcessor.withService = true;
 
-        ProcessScheduler ps = fc.getProcessScheduler();
-        ps.startProcessor(testProcNode, true);
+        processScheduler.startProcessor(testProcNode, true);
         fail();
     }
 
@@ -552,17 +532,16 @@ public class ProcessorLifecycleIT {
      */
     @Test
     public void validateStartSucceedsOnProcessorWithEnabledService() throws Exception {
-        final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        fc = fcsb.getFlowController();
+        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+        flowManager = fcsb.getFlowManager();
 
-        ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
-        this.setControllerRootGroup(fc, testGroup);
+        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
 
-        ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "foo",
-                fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true);
+        ControllerServiceNode testServiceNode = flowManager.createControllerService(TestService.class.getName(), "foo",
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true, true);
         testGroup.addControllerService(testServiceNode);
 
-        ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+        ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         testGroup.addProcessor(testProcNode);
 
@@ -573,12 +552,11 @@ public class ProcessorLifecycleIT {
         testProcessor.withService = true;
         this.noop(testProcessor);
 
-        ProcessScheduler ps = fc.getProcessScheduler();
         testServiceNode.performValidation();
-        ps.enableControllerService(testServiceNode);
+        processScheduler.enableControllerService(testServiceNode);
 
         testProcNode.performValidation();
-        ps.startProcessor(testProcNode, true);
+        processScheduler.startProcessor(testProcNode, true);
 
         Thread.sleep(500);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
@@ -641,7 +619,7 @@ public class ProcessorLifecycleIT {
         testProcessor.setScenario(emptyRunnable, emptyRunnable, emptyRunnable, emptyRunnable);
     }
 
-    private FlowControllerAndSystemBundle buildFlowControllerForTest(final String propKey, final String propValue) throws Exception {
+    private FlowManagerAndSystemBundle buildFlowControllerForTest(final String propKey, final String propValue) throws Exception {
         final Map<String, String> addProps = new HashMap<>();
         addProps.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec");
         addProps.put(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml");
@@ -659,43 +637,33 @@ public class ProcessorLifecycleIT {
         extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
 
         final FlowController flowController = FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties,
-                mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(),
+            mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(),
             new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()),
             mock(FlowRegistryClient.class), extensionManager);
 
-        return new FlowControllerAndSystemBundle(flowController, systemBundle);
+        final FlowManager flowManager = flowController.getFlowManager();
+        this.processScheduler = flowController.getProcessScheduler();
+
+        return new FlowManagerAndSystemBundle(flowManager, systemBundle);
     }
 
-    private FlowControllerAndSystemBundle buildFlowControllerForTest() throws Exception {
+    private FlowManagerAndSystemBundle buildFlowControllerForTest() throws Exception {
         return buildFlowControllerForTest(null, null);
     }
 
-    /**
-     *
-     */
-    private void setControllerRootGroup(FlowController controller, ProcessGroup processGroup) {
-        try {
-            Method m = FlowController.class.getDeclaredMethod("setRootGroup", ProcessGroup.class);
-            m.setAccessible(true);
-            m.invoke(controller, processGroup);
-            controller.initializeFlow();
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to set root group", e);
-        }
-    }
 
-    private static class FlowControllerAndSystemBundle {
+    private static class FlowManagerAndSystemBundle {
 
-        private final FlowController flowController;
+        private final FlowManager flowManager;
         private final Bundle systemBundle;
 
-        public FlowControllerAndSystemBundle(FlowController flowController, Bundle systemBundle) {
-            this.flowController = flowController;
+        public FlowManagerAndSystemBundle(FlowManager flowManager, Bundle systemBundle) {
+            this.flowManager = flowManager;
             this.systemBundle = systemBundle;
         }
 
-        public FlowController getFlowController() {
-            return flowController;
+        public FlowManager getFlowManager() {
+            return flowManager;
         }
 
         public Bundle getSystemBundle() {
@@ -775,8 +743,7 @@ public class ProcessorLifecycleIT {
                     .identifiesControllerService(ITestservice.class)
                     .build();
 
-            return this.withService ? Arrays.asList(new PropertyDescriptor[]{PROP, SERVICE})
-                    : Arrays.asList(new PropertyDescriptor[]{PROP});
+            return this.withService ? Arrays.asList(PROP, SERVICE) : Arrays.asList(PROP);
         }
 
         @Override
@@ -788,20 +755,15 @@ public class ProcessorLifecycleIT {
         }
     }
 
-    /**
-     */
+
     public static class TestService extends AbstractControllerService implements ITestservice {
 
     }
 
-    /**
-     */
     public static interface ITestservice extends ControllerService {
-
     }
 
-    /**
-     */
+
     private static class EmptyRunnable implements Runnable {
 
         @Override
@@ -810,8 +772,7 @@ public class ProcessorLifecycleIT {
         }
     }
 
-    /**
-     */
+
     private static class BlockingInterruptableRunnable implements Runnable {
 
         @Override
@@ -824,8 +785,7 @@ public class ProcessorLifecycleIT {
         }
     }
 
-    /**
-     */
+
     private static class BlockingUninterruptableRunnable implements Runnable {
 
         @Override
@@ -840,8 +800,7 @@ public class ProcessorLifecycleIT {
         }
     }
 
-    /**
-     */
+
     private static class RandomOrFixedDelayedRunnable implements Runnable {
 
         private final int delayLimit;

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java
deleted file mode 100644
index fac0272..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.scheduling;
-
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.components.state.StateManagerProvider;
-import org.apache.nifi.controller.FlowController;
-import org.apache.nifi.controller.service.ControllerServiceNode;
-import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.controller.service.StandardControllerServiceProvider;
-import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.nar.ExtensionDiscoveringManager;
-import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
-import org.apache.nifi.nar.SystemBundle;
-import org.apache.nifi.registry.VariableRegistry;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.SynchronousValidationTrigger;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class StandardProcessSchedulerIT {
-    private final StateManagerProvider stateMgrProvider = Mockito.mock(StateManagerProvider.class);
-    private VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY;
-    private FlowController controller;
-    private NiFiProperties nifiProperties;
-    private Bundle systemBundle;
-    private ExtensionDiscoveringManager extensionManager;
-    private volatile String propsFile = TestStandardProcessScheduler.class.getResource("/standardprocessschedulertest.nifi.properties").getFile();
-
-    @Before
-    public void setup() throws InitializationException {
-        this.nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, null);
-
-        // load the system bundle
-        systemBundle = SystemBundle.create(nifiProperties);
-        extensionManager = new StandardExtensionDiscoveringManager();
-        extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
-
-        controller = Mockito.mock(FlowController.class);
-        Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
-    }
-
-    /**
-     * Validates that the service that is currently in ENABLING state can be
-     * disabled and that its @OnDisabled operation will be invoked as soon as
-     *
-     * @OnEnable finishes.
-     */
-    @Test
-    public void validateLongEnablingServiceCanStillBeDisabled() throws Exception {
-        final StandardProcessScheduler scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateMgrProvider, nifiProperties);
-        final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
-            stateMgrProvider, variableRegistry, nifiProperties, new SynchronousValidationTrigger());
-
-        final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
-            "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
-
-        final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
-        ts.setLimit(3000);
-        scheduler.enableControllerService(serviceNode);
-        Thread.sleep(2000);
-        assertTrue(serviceNode.isActive());
-        assertEquals(1, ts.enableInvocationCount());
-
-        Thread.sleep(500);
-        scheduler.disableControllerService(serviceNode);
-        assertFalse(serviceNode.isActive());
-        assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
-        assertEquals(0, ts.disableInvocationCount());
-        // wait a bit. . . Enabling will finish and @OnDisabled will be invoked
-        // automatically
-        Thread.sleep(4000);
-        assertEquals(ControllerServiceState.DISABLED, serviceNode.getState());
-        assertEquals(1, ts.disableInvocationCount());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 92e6e5d..920999e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -21,22 +21,30 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ExtensionBuilder;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.LoggableComponent;
+import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReloadComponent;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.StandardProcessorNode;
 import org.apache.nifi.controller.TerminationAwareLogger;
 import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
 import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
 import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
 import org.apache.nifi.controller.scheduling.processors.FailOnScheduledProcessor;
 import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.controller.service.StandardControllerServiceNode;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
@@ -93,6 +101,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anySet;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
 
 public class TestStandardProcessScheduler {
 
@@ -102,10 +116,13 @@ public class TestStandardProcessScheduler {
     private final StateManagerProvider stateMgrProvider = Mockito.mock(StateManagerProvider.class);
     private VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY;
     private FlowController controller;
+    private FlowManager flowManager;
     private ProcessGroup rootGroup;
     private NiFiProperties nifiProperties;
     private Bundle systemBundle;
     private ExtensionDiscoveringManager extensionManager;
+    private ControllerServiceProvider serviceProvider;
+
     private volatile String propsFile = TestStandardProcessScheduler.class.getResource("/standardprocessschedulertest.nifi.properties").getFile();
 
     @Before
@@ -125,19 +142,23 @@ public class TestStandardProcessScheduler {
 
         reportingTask = new TestReportingTask();
         final ReportingInitializationContext config = new StandardReportingInitializationContext(UUID.randomUUID().toString(), "Test", SchedulingStrategy.TIMER_DRIVEN, "5 secs",
-                Mockito.mock(ComponentLog.class), null, nifiProperties, null);
+                Mockito.mock(ComponentLog.class), null, KerberosConfig.NOT_CONFIGURED, null);
         reportingTask.initialize(config);
 
         final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null, variableRegistry);
         final TerminationAwareLogger logger = Mockito.mock(TerminationAwareLogger.class);
         final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
         final LoggableComponent<ReportingTask> loggableComponent = new LoggableComponent<>(reportingTask, systemBundle.getBundleDetails().getCoordinate(), logger);
-        taskNode = new StandardReportingTaskNode(loggableComponent, UUID.randomUUID().toString(), null, scheduler, validationContextFactory,
+        taskNode = new StandardReportingTaskNode(loggableComponent, UUID.randomUUID().toString(), Mockito.mock(FlowController.class), scheduler, validationContextFactory,
             new StandardComponentVariableRegistry(variableRegistry), reloadComponent, extensionManager, new SynchronousValidationTrigger());
 
+        flowManager = Mockito.mock(FlowManager.class);
         controller = Mockito.mock(FlowController.class);
+        when(controller.getFlowManager()).thenReturn(flowManager);
         Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
 
+        serviceProvider = new StandardControllerServiceProvider(controller, scheduler, null);
+
         final ConcurrentMap<String, ProcessorNode> processorMap = new ConcurrentHashMap<>();
         Mockito.doAnswer(new Answer<ProcessorNode>() {
             @Override
@@ -145,7 +166,7 @@ public class TestStandardProcessScheduler {
                 final String id = invocation.getArgumentAt(0, String.class);
                 return processorMap.get(id);
             }
-        }).when(controller).getProcessorNode(Mockito.anyString());
+        }).when(flowManager).getProcessorNode(Mockito.anyString());
 
         Mockito.doAnswer(new Answer<Object>() {
             @Override
@@ -154,10 +175,40 @@ public class TestStandardProcessScheduler {
                 processorMap.putIfAbsent(procNode.getIdentifier(), procNode);
                 return null;
             }
-        }).when(controller).onProcessorAdded(Mockito.any(ProcessorNode.class));
+        }).when(flowManager).onProcessorAdded(any(ProcessorNode.class));
+
+        when(controller.getControllerServiceProvider()).thenReturn(serviceProvider);
 
         rootGroup = new MockProcessGroup(controller);
-        Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(rootGroup);
+        when(flowManager.getGroup(Mockito.anyString())).thenReturn(rootGroup);
+
+        when(controller.getReloadComponent()).thenReturn(Mockito.mock(ReloadComponent.class));
+
+        doAnswer(new Answer<ControllerServiceNode>() {
+            @Override
+            public ControllerServiceNode answer(final InvocationOnMock invocation) throws Throwable {
+                final String type = invocation.getArgumentAt(0, String.class);
+                final String id = invocation.getArgumentAt(1, String.class);
+                final BundleCoordinate bundleCoordinate = invocation.getArgumentAt(2, BundleCoordinate.class);
+
+                final ControllerServiceNode serviceNode = new ExtensionBuilder()
+                    .identifier(id)
+                    .type(type)
+                    .bundleCoordinate(bundleCoordinate)
+                    .controllerServiceProvider(serviceProvider)
+                    .processScheduler(Mockito.mock(ProcessScheduler.class))
+                    .nodeTypeProvider(Mockito.mock(NodeTypeProvider.class))
+                    .validationTrigger(Mockito.mock(ValidationTrigger.class))
+                    .reloadComponent(Mockito.mock(ReloadComponent.class))
+                    .variableRegistry(variableRegistry)
+                    .stateManagerProvider(Mockito.mock(StateManagerProvider.class))
+                    .extensionManager(extensionManager)
+                    .buildControllerService();
+
+                serviceProvider.onControllerServiceAdded(serviceNode);
+                return serviceNode;
+            }
+        }).when(flowManager).createControllerService(anyString(), anyString(), any(BundleCoordinate.class), anySet(), anyBoolean(), anyBoolean());
     }
 
     @After
@@ -196,21 +247,20 @@ public class TestStandardProcessScheduler {
     public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException {
         final String uuid = UUID.randomUUID().toString();
         final Processor proc = new ServiceReferencingProcessor();
-        proc.initialize(new StandardProcessorInitializationContext(uuid, null, null, null, null));
+        proc.initialize(new StandardProcessorInitializationContext(uuid, null, null, null, KerberosConfig.NOT_CONFIGURED));
 
         final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
 
-        final StandardControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(controller, scheduler, null,
-            Mockito.mock(StateManagerProvider.class), variableRegistry, nifiProperties, new SynchronousValidationTrigger());
-        final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service",
-                systemBundle.getBundleDetails().getCoordinate(), null, true);
+        final ControllerServiceNode service = flowManager.createControllerService(NoStartServiceImpl.class.getName(), "service",
+                systemBundle.getBundleDetails().getCoordinate(), null, true, true);
+
         rootGroup.addControllerService(service);
 
         final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null);
-        final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid,
-                new StandardValidationContextFactory(serviceProvider, variableRegistry),
-            scheduler, serviceProvider, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY),
-                reloadComponent, extensionManager, new SynchronousValidationTrigger());
+        final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(serviceProvider, variableRegistry);
+        final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid, validationContextFactory, scheduler,
+            serviceProvider, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
+
         rootGroup.addProcessor(procNode);
 
         Map<String, String> procProps = new HashMap<>();
@@ -288,11 +338,9 @@ public class TestStandardProcessScheduler {
     @Test
     public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception {
         final StandardProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
-            stateMgrProvider, variableRegistry, nifiProperties, new SynchronousValidationTrigger());
 
-        final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
-                "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
+        final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
+                "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
 
         assertFalse(serviceNode.isActive());
         final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
@@ -330,11 +378,9 @@ public class TestStandardProcessScheduler {
     @Test
     public void validateDisabledServiceCantBeDisabled() throws Exception {
         final StandardProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider,
-            variableRegistry, nifiProperties, new SynchronousValidationTrigger());
 
-        final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
-                "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
+        final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
+                "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
         final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
         final ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -370,10 +416,9 @@ public class TestStandardProcessScheduler {
     @Test
     public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception {
         final StandardProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider,
-            variableRegistry, nifiProperties, new SynchronousValidationTrigger());
-        final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
-                "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
+        final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
+                "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
+
         final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
         scheduler.enableControllerService(serviceNode);
         assertTrue(serviceNode.isActive());
@@ -405,10 +450,9 @@ public class TestStandardProcessScheduler {
     @Test
     public void validateDisablingOfTheFailedService() throws Exception {
         final StandardProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
-            stateMgrProvider, variableRegistry, nifiProperties, new SynchronousValidationTrigger());
-        final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(),
-                "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
+
+        final ControllerServiceNode serviceNode = flowManager.createControllerService(FailingService.class.getName(),
+                "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
         scheduler.enableControllerService(serviceNode);
         Thread.sleep(1000);
         scheduler.shutdown();
@@ -438,12 +482,11 @@ public class TestStandardProcessScheduler {
     @Ignore
     public void validateEnabledDisableMultiThread() throws Exception {
         final StandardProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider,
-            variableRegistry, nifiProperties, new SynchronousValidationTrigger());
+        final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null);
         final ExecutorService executor = Executors.newCachedThreadPool();
         for (int i = 0; i < 200; i++) {
-            final ControllerServiceNode serviceNode = provider.createControllerService(RandomShortDelayEnablingService.class.getName(), "1",
-                    systemBundle.getBundleDetails().getCoordinate(), null, false);
+            final ControllerServiceNode serviceNode = flowManager.createControllerService(RandomShortDelayEnablingService.class.getName(), "1",
+                    systemBundle.getBundleDetails().getCoordinate(), null, false, true);
 
             executor.execute(new Runnable() {
                 @Override
@@ -482,10 +525,9 @@ public class TestStandardProcessScheduler {
     @Test
     public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception {
         final StandardProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
-            stateMgrProvider, variableRegistry, nifiProperties, new SynchronousValidationTrigger());
-        final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
-                "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
+
+        final ControllerServiceNode serviceNode = flowManager.createControllerService(LongEnablingService.class.getName(),
+                "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
         final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
         ts.setLimit(Long.MAX_VALUE);
         scheduler.enableControllerService(serviceNode);
@@ -506,14 +548,13 @@ public class TestStandardProcessScheduler {
         final FailOnScheduledProcessor proc = new FailOnScheduledProcessor();
         proc.setDesiredFailureCount(3);
 
-        proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, nifiProperties));
+        proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, KerberosConfig.NOT_CONFIGURED));
         final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
         final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null);
 
         final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
-            new StandardValidationContextFactory(controller, variableRegistry),
-            scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY),
-                reloadComponent, extensionManager, new SynchronousValidationTrigger());
+            new StandardValidationContextFactory(serviceProvider, variableRegistry),
+            scheduler, serviceProvider, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
 
         procNode.performValidation();
         rootGroup.addProcessor(procNode);
@@ -527,20 +568,19 @@ public class TestStandardProcessScheduler {
     }
 
     // Test that if processor times out in the @OnScheduled but responds to interrupt, it keeps getting scheduled
-    @Test(timeout = 1000000)
+    @Test(timeout = 10000)
     public void testProcessorTimeOutRespondsToInterrupt() throws InterruptedException {
         final FailOnScheduledProcessor proc = new FailOnScheduledProcessor();
         proc.setDesiredFailureCount(0);
         proc.setOnScheduledSleepDuration(20, TimeUnit.MINUTES, true, 1);
 
-        proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, nifiProperties));
+        proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, KerberosConfig.NOT_CONFIGURED));
         final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
         final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null);
 
         final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
-            new StandardValidationContextFactory(controller, variableRegistry),
-            scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY),
-                reloadComponent, extensionManager, new SynchronousValidationTrigger());
+            new StandardValidationContextFactory(serviceProvider, variableRegistry),
+            scheduler, serviceProvider, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
 
         rootGroup.addProcessor(procNode);
 
@@ -563,14 +603,13 @@ public class TestStandardProcessScheduler {
         proc.setDesiredFailureCount(0);
         proc.setOnScheduledSleepDuration(20, TimeUnit.MINUTES, false, 1);
 
-        proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, nifiProperties));
+        proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, KerberosConfig.NOT_CONFIGURED));
         final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
         final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null);
 
         final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
-            new StandardValidationContextFactory(controller, variableRegistry),
-            scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY),
-                reloadComponent, extensionManager, new SynchronousValidationTrigger());
+            new StandardValidationContextFactory(serviceProvider, variableRegistry),
+            scheduler, serviceProvider, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
 
         rootGroup.addProcessor(procNode);
 
@@ -632,6 +671,6 @@ public class TestStandardProcessScheduler {
     }
 
     private StandardProcessScheduler createScheduler() {
-        return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateMgrProvider, nifiProperties);
+        return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), Mockito.mock(FlowController.class), null, stateMgrProvider, nifiProperties);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
index f019257..196e10e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
@@ -102,9 +102,11 @@ public class StandardFlowSerializerTest {
 
     @Test
     public void testSerializationEscapingAndFiltering() throws Exception {
-        final ProcessorNode dummy = controller.createProcessor(DummyScheduledProcessor.class.getName(), UUID.randomUUID().toString(), systemBundle.getBundleDetails().getCoordinate());
+        final ProcessorNode dummy = controller.getFlowManager().createProcessor(DummyScheduledProcessor.class.getName(),
+            UUID.randomUUID().toString(), systemBundle.getBundleDetails().getCoordinate());
+
         dummy.setComments(RAW_COMMENTS);
-        controller.getRootGroup().addProcessor(dummy);
+        controller.getFlowManager().getRootGroup().addProcessor(dummy);
 
         // serialize the controller
         final ByteArrayOutputStream os = new ByteArrayOutputStream();

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java
index 0f4d3ce..2bcf3d9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java
@@ -18,9 +18,16 @@
 package org.apache.nifi.controller.service;
 
 import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.controller.ExtensionBuilder;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ReloadComponent;
+import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.service.mock.MockProcessGroup;
 import org.apache.nifi.controller.service.mock.ServiceA;
@@ -32,7 +39,6 @@ import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
 import org.apache.nifi.nar.SystemBundle;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.SynchronousValidationTrigger;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -95,20 +101,45 @@ public class StandardControllerServiceProviderIT {
      */
     @Test(timeout = 120000)
     public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException, ExecutionException {
-        final StandardProcessScheduler scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateManagerProvider, niFiProperties);
+        final StandardProcessScheduler scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), Mockito.mock(FlowController.class),
+            null, stateManagerProvider, niFiProperties);
+
         for (int i = 0; i < 5000; i++) {
             testEnableReferencingServicesGraph(scheduler);
         }
     }
 
+    private ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final ControllerServiceProvider serviceProvider) {
+        final ControllerServiceNode serviceNode = new ExtensionBuilder()
+            .identifier(id)
+            .type(type)
+            .bundleCoordinate(bundleCoordinate)
+            .controllerServiceProvider(serviceProvider)
+            .processScheduler(Mockito.mock(ProcessScheduler.class))
+            .nodeTypeProvider(Mockito.mock(NodeTypeProvider.class))
+            .validationTrigger(Mockito.mock(ValidationTrigger.class))
+            .reloadComponent(Mockito.mock(ReloadComponent.class))
+            .variableRegistry(variableRegistry)
+            .stateManagerProvider(Mockito.mock(StateManagerProvider.class))
+            .extensionManager(extensionManager)
+            .buildControllerService();
+
+        serviceProvider.onControllerServiceAdded(serviceNode);
+
+        return serviceNode;
+    }
+
     public void testEnableReferencingServicesGraph(final StandardProcessScheduler scheduler) throws InterruptedException, ExecutionException {
         final FlowController controller = Mockito.mock(FlowController.class);
+
         final ProcessGroup procGroup = new MockProcessGroup(controller);
-        Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
+        final FlowManager flowManager = Mockito.mock(FlowManager.class);
+        Mockito.when(controller.getFlowManager()).thenReturn(flowManager);
+
+        Mockito.when(flowManager.getGroup(Mockito.anyString())).thenReturn(procGroup);
         Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
 
-        final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
-            stateManagerProvider, variableRegistry, niFiProperties, new SynchronousValidationTrigger());
+        final ControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(controller, scheduler, null);
 
         // build a graph of controller services with dependencies as such:
         //
@@ -122,14 +153,10 @@ public class StandardControllerServiceProviderIT {
         // So we have to verify that if D is enabled, when we enable its referencing services,
         // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
         // until B is first enabled so ensure that we enable B first.
-        final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1",
-            systemBundle.getBundleDetails().getCoordinate(), null, false);
-        final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2",
-            systemBundle.getBundleDetails().getCoordinate(), null, false);
-        final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3",
-            systemBundle.getBundleDetails().getCoordinate(), null, false);
-        final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4",
-            systemBundle.getBundleDetails().getCoordinate(), null, false);
+        final ControllerServiceNode serviceNode1 = createControllerService(ServiceA.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), serviceProvider);
+        final ControllerServiceNode serviceNode2 = createControllerService(ServiceA.class.getName(), "2", systemBundle.getBundleDetails().getCoordinate(), serviceProvider);
+        final ControllerServiceNode serviceNode3 = createControllerService(ServiceA.class.getName(), "3", systemBundle.getBundleDetails().getCoordinate(), serviceProvider);
+        final ControllerServiceNode serviceNode4 = createControllerService(ServiceB.class.getName(), "4", systemBundle.getBundleDetails().getCoordinate(), serviceProvider);
 
         procGroup.addControllerService(serviceNode1);
         procGroup.addControllerService(serviceNode2);
@@ -141,8 +168,9 @@ public class StandardControllerServiceProviderIT {
         setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2");
         setProperty(serviceNode3, ServiceA.OTHER_SERVICE_2.getName(), "4");
 
-        provider.enableControllerService(serviceNode4).get();
-        provider.enableReferencingServices(serviceNode4);
+        serviceNode4.performValidation();
+        serviceProvider.enableControllerService(serviceNode4).get();
+        serviceProvider.enableReferencingServices(serviceNode4);
 
         // Verify that the services are either ENABLING or ENABLED, and wait for all of them to become ENABLED.
         // Note that we set a timeout of 10 seconds, in case a bug occurs and the services never become ENABLED.

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
index 0a0b05f..f70ce6e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
@@ -17,10 +17,15 @@
 package org.apache.nifi.controller.service;
 
 import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ExtensionBuilder;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ReloadComponent;
 import org.apache.nifi.nar.ExtensionDiscoveringManager;
 import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
 import org.apache.nifi.nar.SystemBundle;
@@ -28,7 +33,6 @@ import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.SynchronousValidationTrigger;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -37,6 +41,7 @@ import org.mockito.Mockito;
 import java.util.Collections;
 
 
+
 public class StandardControllerServiceProviderTest {
 
     private ControllerService proxied;
@@ -48,7 +53,7 @@ public class StandardControllerServiceProviderTest {
     private static FlowController flowController;
 
     @BeforeClass
-    public static void setupSuite() throws Exception {
+    public static void setupSuite() {
         System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, StandardControllerServiceProviderTest.class.getResource("/conf/nifi.properties").getFile());
         nifiProperties = NiFiProperties.createBasicNiFiProperties(null, null);
 
@@ -63,37 +68,35 @@ public class StandardControllerServiceProviderTest {
         Mockito.when(flowController.getExtensionManager()).thenReturn(extensionManager);
     }
 
+
     @Before
     public void setup() throws Exception {
         String id = "id";
         String clazz = "org.apache.nifi.controller.service.util.TestControllerService";
-        ControllerServiceProvider provider = new StandardControllerServiceProvider(flowController, null, null, new StateManagerProvider() {
-            @Override
-            public StateManager getStateManager(final String componentId) {
-                return Mockito.mock(StateManager.class);
-            }
-
-            @Override
-            public void shutdown() {
-            }
-
-            @Override
-            public void enableClusterProvider() {
-            }
-
-            @Override
-            public void disableClusterProvider() {
-            }
-
-            @Override
-            public void onComponentRemoved(String componentId) {
-            }
-        }, variableRegistry, nifiProperties, new SynchronousValidationTrigger());
-        ControllerServiceNode node = provider.createControllerService(clazz, id, systemBundle.getBundleDetails().getCoordinate(), null, true);
+        ControllerServiceProvider provider = new StandardControllerServiceProvider(Mockito.mock(FlowController.class), null, null);
+        ControllerServiceNode node = createControllerService(clazz, id, systemBundle.getBundleDetails().getCoordinate(), provider);
         proxied = node.getProxiedControllerService();
         implementation = node.getControllerServiceImplementation();
     }
 
+    private ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final ControllerServiceProvider serviceProvider) {
+        final ControllerServiceNode serviceNode = new ExtensionBuilder()
+            .identifier(id)
+            .type(type)
+            .bundleCoordinate(bundleCoordinate)
+            .controllerServiceProvider(serviceProvider)
+            .processScheduler(Mockito.mock(ProcessScheduler.class))
+            .nodeTypeProvider(Mockito.mock(NodeTypeProvider.class))
+            .validationTrigger(Mockito.mock(ValidationTrigger.class))
+            .reloadComponent(Mockito.mock(ReloadComponent.class))
+            .variableRegistry(variableRegistry)
+            .stateManagerProvider(Mockito.mock(StateManagerProvider.class))
+            .extensionManager(extensionManager)
+            .buildControllerService();
+
+        return serviceNode;
+    }
+
     @Test(expected = UnsupportedOperationException.class)
     public void testCallProxiedOnPropertyModified() {
         proxied.onPropertyModified(null, "oldValue", "newValue");