You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/11/06 16:24:04 UTC
[3/9] nifi git commit: NIFI-5769: Refactored FlowController to use
Composition over Inheritance - Ensure that when root group is set,
that we register its ID in FlowManager
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
index fa164fc..55b015d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
@@ -33,6 +33,7 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.events.VolatileBulletinRepository;
@@ -57,7 +58,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -89,9 +89,10 @@ public class ProcessorLifecycleIT {
private static final long MEDIUM_DELAY_TOLERANCE = 15000L;
private static final long LONG_DELAY_TOLERANCE = 20000L;
- private FlowController fc;
+ private FlowManager flowManager;
private Map<String, String> properties = new HashMap<>();
private volatile String propsFile = "src/test/resources/lifecycletest.nifi.properties";
+ private ProcessScheduler processScheduler;
@Before
public void before() throws Exception {
@@ -100,7 +101,11 @@ public class ProcessorLifecycleIT {
@After
public void after() throws Exception {
- fc.shutdown(true);
+ if (processScheduler != null) {
+ processScheduler.shutdown();
+ processScheduler = null;
+ }
+
FileUtils.deleteDirectory(new File("./target/lifecycletest"));
}
@@ -123,11 +128,10 @@ public class ProcessorLifecycleIT {
@Test
public void validateEnableOperation() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- fc = fcsb.getFlowController();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
- final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ flowManager = fcsb.getFlowManager();
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+ final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(),
UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
@@ -145,12 +149,11 @@ public class ProcessorLifecycleIT {
@Test
public void validateDisableOperation() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- fc = fcsb.getFlowController();
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ flowManager = fcsb.getFlowManager();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
- final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+ final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(),
UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
@@ -162,9 +165,8 @@ public class ProcessorLifecycleIT {
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState());
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
- ProcessScheduler ps = fc.getProcessScheduler();
testProcNode.performValidation();
- ps.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
}
@@ -174,24 +176,22 @@ public class ProcessorLifecycleIT {
*/
@Test
public void validateIdempotencyOfProcessorStartOperation() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- fc = fcsb.getFlowController();
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ flowManager = fcsb.getFlowManager();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
- final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+ final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.noop(testProcessor);
- final ProcessScheduler ps = fc.getProcessScheduler();
testProcNode.performValidation();
- ps.startProcessor(testProcNode, true);
- ps.startProcessor(testProcNode, true);
- ps.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
Thread.sleep(500);
assertCondition(() -> testProcessor.operationNames.size() == 1);
@@ -204,11 +204,10 @@ public class ProcessorLifecycleIT {
*/
@Test
public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- fc = fcsb.getFlowController();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
- final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ flowManager = fcsb.getFlowManager();
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+ final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -216,8 +215,8 @@ public class ProcessorLifecycleIT {
// sets the scenario for the processor to run
int randomDelayLimit = 3000;
this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
- final ProcessScheduler ps = fc.getProcessScheduler();
- ps.stopProcessor(testProcNode);
+
+ processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
assertTrue(testProcessor.operationNames.isEmpty());
}
@@ -229,11 +228,10 @@ public class ProcessorLifecycleIT {
@Test
@Ignore
public void validateSuccessfulAndOrderlyShutdown() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- fc = fcsb.getFlowController();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ flowManager = fcsb.getFlowManager();
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+ ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -248,10 +246,10 @@ public class ProcessorLifecycleIT {
testGroup.addProcessor(testProcNode);
- fc.startProcessGroup(testGroup.getIdentifier());
+ flowManager.getGroup(testGroup.getIdentifier()).startProcessing();
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
- fc.stopAllProcessors();
+ flowManager.getRootGroup().stopProcessing();
Thread.sleep(randomDelayLimit); // up to randomDelayLimit, otherwise next assertion may fail as the processor still executing
@@ -273,12 +271,11 @@ public class ProcessorLifecycleIT {
@Test
@Ignore
public void validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- fc = fcsb.getFlowController();
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ flowManager = fcsb.getFlowManager();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
- final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+ final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -286,7 +283,6 @@ public class ProcessorLifecycleIT {
// sets the scenario for the processor to run
this.noop(testProcessor);
- final ProcessScheduler ps = fc.getProcessScheduler();
ExecutorService executor = Executors.newFixedThreadPool(100);
int startCallsCount = 10000;
final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount);
@@ -297,7 +293,7 @@ public class ProcessorLifecycleIT {
@Override
public void run() {
LockSupport.parkNanos(random.nextInt(9000000));
- ps.stopProcessor(testProcNode);
+ processScheduler.stopProcessor(testProcNode);
countDownCounter.countDown();
}
});
@@ -307,7 +303,7 @@ public class ProcessorLifecycleIT {
@Override
public void run() {
LockSupport.parkNanos(random.nextInt(9000000));
- ps.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
countDownCounter.countDown();
}
});
@@ -332,12 +328,11 @@ public class ProcessorLifecycleIT {
*/
@Test
public void validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- fc = fcsb.getFlowController();
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ flowManager = fcsb.getFlowManager();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+ ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -345,13 +340,12 @@ public class ProcessorLifecycleIT {
// sets the scenario for the processor to run
int delay = 200;
this.longRunningOnSchedule(testProcessor, delay);
- ProcessScheduler ps = fc.getProcessScheduler();
testProcNode.performValidation();
- ps.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
- ps.stopProcessor(testProcNode);
+ processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
assertCondition(() -> testProcessor.operationNames.size() == 3, LONG_DELAY_TOLERANCE);
assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
@@ -366,12 +360,11 @@ public class ProcessorLifecycleIT {
*/
@Test
public void validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- fc = fcsb.getFlowController();
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ flowManager = fcsb.getFlowManager();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+ ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -380,12 +373,11 @@ public class ProcessorLifecycleIT {
this.noop(testProcessor);
testProcessor.generateExceptionOnScheduled = true;
testProcessor.keepFailingOnScheduledTimes = 2;
- ProcessScheduler ps = fc.getProcessScheduler();
testProcNode.performValidation();
- ps.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
- ps.stopProcessor(testProcNode);
+ processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
}
@@ -396,12 +388,11 @@ public class ProcessorLifecycleIT {
*/
@Test
public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- fc = fcsb.getFlowController();
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ flowManager = fcsb.getFlowManager();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+ ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -410,12 +401,11 @@ public class ProcessorLifecycleIT {
this.longRunningOnUnschedule(testProcessor, 100);
testProcessor.generateExceptionOnScheduled = true;
testProcessor.keepFailingOnScheduledTimes = Integer.MAX_VALUE;
- ProcessScheduler ps = fc.getProcessScheduler();
testProcNode.performValidation();
- ps.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
- ps.stopProcessor(testProcNode);
+ processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
}
@@ -425,23 +415,21 @@ public class ProcessorLifecycleIT {
*/
@Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
- fc = fcsb.getFlowController();
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
+ flowManager = fcsb.getFlowManager();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+ ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.blockingInterruptableOnUnschedule(testProcessor);
- ProcessScheduler ps = fc.getProcessScheduler();
testProcNode.performValidation();
- ps.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
- ps.stopProcessor(testProcNode);
+ processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
}
@@ -451,23 +439,21 @@ public class ProcessorLifecycleIT {
*/
@Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "1 sec");
- fc = fcsb.getFlowController();
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "1 sec");
+ flowManager = fcsb.getFlowManager();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+ ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.blockingUninterruptableOnUnschedule(testProcessor);
- ProcessScheduler ps = fc.getProcessScheduler();
testProcNode.performValidation();
- ps.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
- ps.stopProcessor(testProcNode);
+ processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
}
@@ -477,12 +463,11 @@ public class ProcessorLifecycleIT {
*/
@Test
public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- fc = fcsb.getFlowController();
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ flowManager = fcsb.getFlowManager();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+ ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -490,14 +475,13 @@ public class ProcessorLifecycleIT {
// sets the scenario for the processor to run
this.noop(testProcessor);
testProcessor.generateExceptionOnTrigger = true;
- ProcessScheduler ps = fc.getProcessScheduler();
testProcNode.performValidation();
- ps.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
- ps.disableProcessor(testProcNode);
+ processScheduler.disableProcessor(testProcNode);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
- ps.stopProcessor(testProcNode);
+ processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
}
@@ -507,15 +491,13 @@ public class ProcessorLifecycleIT {
*/
@Test(expected = IllegalStateException.class)
public void validateStartFailsOnInvalidProcessorWithMissingProperty() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- fc = fcsb.getFlowController();
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ flowManager = fcsb.getFlowManager();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
+ ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
- ProcessScheduler ps = fc.getProcessScheduler();
- ps.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
fail();
}
@@ -525,15 +507,14 @@ public class ProcessorLifecycleIT {
*/
@Test(expected = IllegalStateException.class)
public void validateStartFailsOnInvalidProcessorWithDisabledService() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- fc = fcsb.getFlowController();
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ flowManager = fcsb.getFlowManager();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
- ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "serv",
- fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ ControllerServiceNode testServiceNode = flowManager.createControllerService(TestService.class.getName(), "serv",
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true, true);
+ ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
properties.put("S", testServiceNode.getIdentifier());
@@ -542,8 +523,7 @@ public class ProcessorLifecycleIT {
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
testProcessor.withService = true;
- ProcessScheduler ps = fc.getProcessScheduler();
- ps.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
fail();
}
@@ -552,17 +532,16 @@ public class ProcessorLifecycleIT {
*/
@Test
public void validateStartSucceedsOnProcessorWithEnabledService() throws Exception {
- final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- fc = fcsb.getFlowController();
+ final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ flowManager = fcsb.getFlowManager();
- ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
- this.setControllerRootGroup(fc, testGroup);
+ ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
- ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "foo",
- fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true);
+ ControllerServiceNode testServiceNode = flowManager.createControllerService(TestService.class.getName(), "foo",
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true, true);
testGroup.addControllerService(testServiceNode);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testGroup.addProcessor(testProcNode);
@@ -573,12 +552,11 @@ public class ProcessorLifecycleIT {
testProcessor.withService = true;
this.noop(testProcessor);
- ProcessScheduler ps = fc.getProcessScheduler();
testServiceNode.performValidation();
- ps.enableControllerService(testServiceNode);
+ processScheduler.enableControllerService(testServiceNode);
testProcNode.performValidation();
- ps.startProcessor(testProcNode, true);
+ processScheduler.startProcessor(testProcNode, true);
Thread.sleep(500);
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
@@ -641,7 +619,7 @@ public class ProcessorLifecycleIT {
testProcessor.setScenario(emptyRunnable, emptyRunnable, emptyRunnable, emptyRunnable);
}
- private FlowControllerAndSystemBundle buildFlowControllerForTest(final String propKey, final String propValue) throws Exception {
+ private FlowManagerAndSystemBundle buildFlowControllerForTest(final String propKey, final String propValue) throws Exception {
final Map<String, String> addProps = new HashMap<>();
addProps.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec");
addProps.put(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml");
@@ -659,43 +637,33 @@ public class ProcessorLifecycleIT {
extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
final FlowController flowController = FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties,
- mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(),
+ mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(),
new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()),
mock(FlowRegistryClient.class), extensionManager);
- return new FlowControllerAndSystemBundle(flowController, systemBundle);
+ final FlowManager flowManager = flowController.getFlowManager();
+ this.processScheduler = flowController.getProcessScheduler();
+
+ return new FlowManagerAndSystemBundle(flowManager, systemBundle);
}
- private FlowControllerAndSystemBundle buildFlowControllerForTest() throws Exception {
+ private FlowManagerAndSystemBundle buildFlowControllerForTest() throws Exception {
return buildFlowControllerForTest(null, null);
}
- /**
- *
- */
- private void setControllerRootGroup(FlowController controller, ProcessGroup processGroup) {
- try {
- Method m = FlowController.class.getDeclaredMethod("setRootGroup", ProcessGroup.class);
- m.setAccessible(true);
- m.invoke(controller, processGroup);
- controller.initializeFlow();
- } catch (Exception e) {
- throw new IllegalStateException("Failed to set root group", e);
- }
- }
- private static class FlowControllerAndSystemBundle {
+ private static class FlowManagerAndSystemBundle {
- private final FlowController flowController;
+ private final FlowManager flowManager;
private final Bundle systemBundle;
- public FlowControllerAndSystemBundle(FlowController flowController, Bundle systemBundle) {
- this.flowController = flowController;
+ public FlowManagerAndSystemBundle(FlowManager flowManager, Bundle systemBundle) {
+ this.flowManager = flowManager;
this.systemBundle = systemBundle;
}
- public FlowController getFlowController() {
- return flowController;
+ public FlowManager getFlowManager() {
+ return flowManager;
}
public Bundle getSystemBundle() {
@@ -775,8 +743,7 @@ public class ProcessorLifecycleIT {
.identifiesControllerService(ITestservice.class)
.build();
- return this.withService ? Arrays.asList(new PropertyDescriptor[]{PROP, SERVICE})
- : Arrays.asList(new PropertyDescriptor[]{PROP});
+ return this.withService ? Arrays.asList(PROP, SERVICE) : Arrays.asList(PROP);
}
@Override
@@ -788,20 +755,15 @@ public class ProcessorLifecycleIT {
}
}
- /**
- */
+
public static class TestService extends AbstractControllerService implements ITestservice {
}
- /**
- */
public static interface ITestservice extends ControllerService {
-
}
- /**
- */
+
private static class EmptyRunnable implements Runnable {
@Override
@@ -810,8 +772,7 @@ public class ProcessorLifecycleIT {
}
}
- /**
- */
+
private static class BlockingInterruptableRunnable implements Runnable {
@Override
@@ -824,8 +785,7 @@ public class ProcessorLifecycleIT {
}
}
- /**
- */
+
private static class BlockingUninterruptableRunnable implements Runnable {
@Override
@@ -840,8 +800,7 @@ public class ProcessorLifecycleIT {
}
}
- /**
- */
+
private static class RandomOrFixedDelayedRunnable implements Runnable {
private final int delayLimit;
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java
deleted file mode 100644
index fac0272..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.scheduling;
-
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.components.state.StateManagerProvider;
-import org.apache.nifi.controller.FlowController;
-import org.apache.nifi.controller.service.ControllerServiceNode;
-import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.controller.service.StandardControllerServiceProvider;
-import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.nar.ExtensionDiscoveringManager;
-import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
-import org.apache.nifi.nar.SystemBundle;
-import org.apache.nifi.registry.VariableRegistry;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.SynchronousValidationTrigger;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class StandardProcessSchedulerIT {
- private final StateManagerProvider stateMgrProvider = Mockito.mock(StateManagerProvider.class);
- private VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY;
- private FlowController controller;
- private NiFiProperties nifiProperties;
- private Bundle systemBundle;
- private ExtensionDiscoveringManager extensionManager;
- private volatile String propsFile = TestStandardProcessScheduler.class.getResource("/standardprocessschedulertest.nifi.properties").getFile();
-
- @Before
- public void setup() throws InitializationException {
- this.nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, null);
-
- // load the system bundle
- systemBundle = SystemBundle.create(nifiProperties);
- extensionManager = new StandardExtensionDiscoveringManager();
- extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
-
- controller = Mockito.mock(FlowController.class);
- Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
- }
-
- /**
- * Validates that the service that is currently in ENABLING state can be
- * disabled and that its @OnDisabled operation will be invoked as soon as
- *
- * @OnEnable finishes.
- */
- @Test
- public void validateLongEnablingServiceCanStillBeDisabled() throws Exception {
- final StandardProcessScheduler scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateMgrProvider, nifiProperties);
- final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
- stateMgrProvider, variableRegistry, nifiProperties, new SynchronousValidationTrigger());
-
- final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
- "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
-
- final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
- ts.setLimit(3000);
- scheduler.enableControllerService(serviceNode);
- Thread.sleep(2000);
- assertTrue(serviceNode.isActive());
- assertEquals(1, ts.enableInvocationCount());
-
- Thread.sleep(500);
- scheduler.disableControllerService(serviceNode);
- assertFalse(serviceNode.isActive());
- assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
- assertEquals(0, ts.disableInvocationCount());
- // wait a bit. . . Enabling will finish and @OnDisabled will be invoked
- // automatically
- Thread.sleep(4000);
- assertEquals(ControllerServiceState.DISABLED, serviceNode.getState());
- assertEquals(1, ts.disableInvocationCount());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 92e6e5d..920999e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -21,22 +21,30 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ExtensionBuilder;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.LoggableComponent;
+import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.TerminationAwareLogger;
import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
import org.apache.nifi.controller.scheduling.processors.FailOnScheduledProcessor;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardControllerServiceNode;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
@@ -93,6 +101,12 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anySet;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
public class TestStandardProcessScheduler {
@@ -102,10 +116,13 @@ public class TestStandardProcessScheduler {
private final StateManagerProvider stateMgrProvider = Mockito.mock(StateManagerProvider.class);
private VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY;
private FlowController controller;
+ private FlowManager flowManager;
private ProcessGroup rootGroup;
private NiFiProperties nifiProperties;
private Bundle systemBundle;
private ExtensionDiscoveringManager extensionManager;
+ private ControllerServiceProvider serviceProvider;
+
private volatile String propsFile = TestStandardProcessScheduler.class.getResource("/standardprocessschedulertest.nifi.properties").getFile();
@Before
@@ -125,19 +142,23 @@ public class TestStandardProcessScheduler {
reportingTask = new TestReportingTask();
final ReportingInitializationContext config = new StandardReportingInitializationContext(UUID.randomUUID().toString(), "Test", SchedulingStrategy.TIMER_DRIVEN, "5 secs",
- Mockito.mock(ComponentLog.class), null, nifiProperties, null);
+ Mockito.mock(ComponentLog.class), null, KerberosConfig.NOT_CONFIGURED, null);
reportingTask.initialize(config);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null, variableRegistry);
final TerminationAwareLogger logger = Mockito.mock(TerminationAwareLogger.class);
final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
final LoggableComponent<ReportingTask> loggableComponent = new LoggableComponent<>(reportingTask, systemBundle.getBundleDetails().getCoordinate(), logger);
- taskNode = new StandardReportingTaskNode(loggableComponent, UUID.randomUUID().toString(), null, scheduler, validationContextFactory,
+ taskNode = new StandardReportingTaskNode(loggableComponent, UUID.randomUUID().toString(), Mockito.mock(FlowController.class), scheduler, validationContextFactory,
new StandardComponentVariableRegistry(variableRegistry), reloadComponent, extensionManager, new SynchronousValidationTrigger());
+ flowManager = Mockito.mock(FlowManager.class);
controller = Mockito.mock(FlowController.class);
+ when(controller.getFlowManager()).thenReturn(flowManager);
Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
+ serviceProvider = new StandardControllerServiceProvider(controller, scheduler, null);
+
final ConcurrentMap<String, ProcessorNode> processorMap = new ConcurrentHashMap<>();
Mockito.doAnswer(new Answer<ProcessorNode>() {
@Override
@@ -145,7 +166,7 @@ public class TestStandardProcessScheduler {
final String id = invocation.getArgumentAt(0, String.class);
return processorMap.get(id);
}
- }).when(controller).getProcessorNode(Mockito.anyString());
+ }).when(flowManager).getProcessorNode(Mockito.anyString());
Mockito.doAnswer(new Answer<Object>() {
@Override
@@ -154,10 +175,40 @@ public class TestStandardProcessScheduler {
processorMap.putIfAbsent(procNode.getIdentifier(), procNode);
return null;
}
- }).when(controller).onProcessorAdded(Mockito.any(ProcessorNode.class));
+ }).when(flowManager).onProcessorAdded(any(ProcessorNode.class));
+
+ when(controller.getControllerServiceProvider()).thenReturn(serviceProvider);
rootGroup = new MockProcessGroup(controller);
- Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(rootGroup);
+ when(flowManager.getGroup(Mockito.anyString())).thenReturn(rootGroup);
+
+ when(controller.getReloadComponent()).thenReturn(Mockito.mock(ReloadComponent.class));
+
+ doAnswer(new Answer<ControllerServiceNode>() {
+ @Override
+ public ControllerServiceNode answer(final InvocationOnMock invocation) throws Throwable {
+ final String type = invocation.getArgumentAt(0, String.class);
+ final String id = invocation.getArgumentAt(1, String.class);
+ final BundleCoordinate bundleCoordinate = invocation.getArgumentAt(2, BundleCoordinate.class);
+
+ final ControllerServiceNode serviceNode = new ExtensionBuilder()
+ .identifier(id)
+ .type(type)
+ .bundleCoordinate(bundleCoordinate)
+ .controllerServiceProvider(serviceProvider)
+ .processScheduler(Mockito.mock(ProcessScheduler.class))
+ .nodeTypeProvider(Mockito.mock(NodeTypeProvider.class))
+ .validationTrigger(Mockito.mock(ValidationTrigger.class))
+ .reloadComponent(Mockito.mock(ReloadComponent.class))
+ .variableRegistry(variableRegistry)
+ .stateManagerProvider(Mockito.mock(StateManagerProvider.class))
+ .extensionManager(extensionManager)
+ .buildControllerService();
+
+ serviceProvider.onControllerServiceAdded(serviceNode);
+ return serviceNode;
+ }
+ }).when(flowManager).createControllerService(anyString(), anyString(), any(BundleCoordinate.class), anySet(), anyBoolean(), anyBoolean());
}
@After
@@ -196,21 +247,20 @@ public class TestStandardProcessScheduler {
public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException {
final String uuid = UUID.randomUUID().toString();
final Processor proc = new ServiceReferencingProcessor();
- proc.initialize(new StandardProcessorInitializationContext(uuid, null, null, null, null));
+ proc.initialize(new StandardProcessorInitializationContext(uuid, null, null, null, KerberosConfig.NOT_CONFIGURED));
final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
- final StandardControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(controller, scheduler, null,
- Mockito.mock(StateManagerProvider.class), variableRegistry, nifiProperties, new SynchronousValidationTrigger());
- final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service",
- systemBundle.getBundleDetails().getCoordinate(), null, true);
+ final ControllerServiceNode service = flowManager.createControllerService(NoStartServiceImpl.class.getName(), "service",
+ systemBundle.getBundleDetails().getCoordinate(), null, true, true);
+
rootGroup.addControllerService(service);
final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null);
- final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid,
- new StandardValidationContextFactory(serviceProvider, variableRegistry),
- scheduler, serviceProvider, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY),
- reloadComponent, extensionManager, new SynchronousValidationTrigger());
+ final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(serviceProvider, variableRegistry);
+ final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid, validationContextFactory, scheduler,
+ serviceProvider, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
+
rootGroup.addProcessor(procNode);
Map<String, String> procProps = new HashMap<>();
@@ -288,11 +338,9 @@ public class TestStandardProcessScheduler {
@Test
public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception {
final StandardProcessScheduler scheduler = createScheduler();
- final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
- stateMgrProvider, variableRegistry, nifiProperties, new SynchronousValidationTrigger());
- final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
- "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
+ final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
+ "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
assertFalse(serviceNode.isActive());
final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
@@ -330,11 +378,9 @@ public class TestStandardProcessScheduler {
@Test
public void validateDisabledServiceCantBeDisabled() throws Exception {
final StandardProcessScheduler scheduler = createScheduler();
- final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider,
- variableRegistry, nifiProperties, new SynchronousValidationTrigger());
- final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
- "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
+ final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
+ "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
final ExecutorService executor = Executors.newCachedThreadPool();
@@ -370,10 +416,9 @@ public class TestStandardProcessScheduler {
@Test
public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception {
final StandardProcessScheduler scheduler = createScheduler();
- final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider,
- variableRegistry, nifiProperties, new SynchronousValidationTrigger());
- final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
- "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
+ final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
+ "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
+
final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
scheduler.enableControllerService(serviceNode);
assertTrue(serviceNode.isActive());
@@ -405,10 +450,9 @@ public class TestStandardProcessScheduler {
@Test
public void validateDisablingOfTheFailedService() throws Exception {
final StandardProcessScheduler scheduler = createScheduler();
- final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
- stateMgrProvider, variableRegistry, nifiProperties, new SynchronousValidationTrigger());
- final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(),
- "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
+
+ final ControllerServiceNode serviceNode = flowManager.createControllerService(FailingService.class.getName(),
+ "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
scheduler.enableControllerService(serviceNode);
Thread.sleep(1000);
scheduler.shutdown();
@@ -438,12 +482,11 @@ public class TestStandardProcessScheduler {
@Ignore
public void validateEnabledDisableMultiThread() throws Exception {
final StandardProcessScheduler scheduler = createScheduler();
- final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider,
- variableRegistry, nifiProperties, new SynchronousValidationTrigger());
+ final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null);
final ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 200; i++) {
- final ControllerServiceNode serviceNode = provider.createControllerService(RandomShortDelayEnablingService.class.getName(), "1",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
+ final ControllerServiceNode serviceNode = flowManager.createControllerService(RandomShortDelayEnablingService.class.getName(), "1",
+ systemBundle.getBundleDetails().getCoordinate(), null, false, true);
executor.execute(new Runnable() {
@Override
@@ -482,10 +525,9 @@ public class TestStandardProcessScheduler {
@Test
public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception {
final StandardProcessScheduler scheduler = createScheduler();
- final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
- stateMgrProvider, variableRegistry, nifiProperties, new SynchronousValidationTrigger());
- final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
- "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
+
+ final ControllerServiceNode serviceNode = flowManager.createControllerService(LongEnablingService.class.getName(),
+ "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
ts.setLimit(Long.MAX_VALUE);
scheduler.enableControllerService(serviceNode);
@@ -506,14 +548,13 @@ public class TestStandardProcessScheduler {
final FailOnScheduledProcessor proc = new FailOnScheduledProcessor();
proc.setDesiredFailureCount(3);
- proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, nifiProperties));
+ proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, KerberosConfig.NOT_CONFIGURED));
final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null);
final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
- new StandardValidationContextFactory(controller, variableRegistry),
- scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY),
- reloadComponent, extensionManager, new SynchronousValidationTrigger());
+ new StandardValidationContextFactory(serviceProvider, variableRegistry),
+ scheduler, serviceProvider, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
procNode.performValidation();
rootGroup.addProcessor(procNode);
@@ -527,20 +568,19 @@ public class TestStandardProcessScheduler {
}
// Test that if processor times out in the @OnScheduled but responds to interrupt, it keeps getting scheduled
- @Test(timeout = 1000000)
+ @Test(timeout = 10000)
public void testProcessorTimeOutRespondsToInterrupt() throws InterruptedException {
final FailOnScheduledProcessor proc = new FailOnScheduledProcessor();
proc.setDesiredFailureCount(0);
proc.setOnScheduledSleepDuration(20, TimeUnit.MINUTES, true, 1);
- proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, nifiProperties));
+ proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, KerberosConfig.NOT_CONFIGURED));
final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null);
final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
- new StandardValidationContextFactory(controller, variableRegistry),
- scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY),
- reloadComponent, extensionManager, new SynchronousValidationTrigger());
+ new StandardValidationContextFactory(serviceProvider, variableRegistry),
+ scheduler, serviceProvider, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
rootGroup.addProcessor(procNode);
@@ -563,14 +603,13 @@ public class TestStandardProcessScheduler {
proc.setDesiredFailureCount(0);
proc.setOnScheduledSleepDuration(20, TimeUnit.MINUTES, false, 1);
- proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, nifiProperties));
+ proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, KerberosConfig.NOT_CONFIGURED));
final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null);
final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
- new StandardValidationContextFactory(controller, variableRegistry),
- scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY),
- reloadComponent, extensionManager, new SynchronousValidationTrigger());
+ new StandardValidationContextFactory(serviceProvider, variableRegistry),
+ scheduler, serviceProvider, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
rootGroup.addProcessor(procNode);
@@ -632,6 +671,6 @@ public class TestStandardProcessScheduler {
}
private StandardProcessScheduler createScheduler() {
- return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateMgrProvider, nifiProperties);
+ return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), Mockito.mock(FlowController.class), null, stateMgrProvider, nifiProperties);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
index f019257..196e10e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
@@ -102,9 +102,11 @@ public class StandardFlowSerializerTest {
@Test
public void testSerializationEscapingAndFiltering() throws Exception {
- final ProcessorNode dummy = controller.createProcessor(DummyScheduledProcessor.class.getName(), UUID.randomUUID().toString(), systemBundle.getBundleDetails().getCoordinate());
+ final ProcessorNode dummy = controller.getFlowManager().createProcessor(DummyScheduledProcessor.class.getName(),
+ UUID.randomUUID().toString(), systemBundle.getBundleDetails().getCoordinate());
+
dummy.setComments(RAW_COMMENTS);
- controller.getRootGroup().addProcessor(dummy);
+ controller.getFlowManager().getRootGroup().addProcessor(dummy);
// serialize the controller
final ByteArrayOutputStream os = new ByteArrayOutputStream();
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java
index 0f4d3ce..2bcf3d9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java
@@ -18,9 +18,16 @@
package org.apache.nifi.controller.service;
import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.controller.ExtensionBuilder;
import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ReloadComponent;
+import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.service.mock.MockProcessGroup;
import org.apache.nifi.controller.service.mock.ServiceA;
@@ -32,7 +39,6 @@ import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.SynchronousValidationTrigger;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -95,20 +101,45 @@ public class StandardControllerServiceProviderIT {
*/
@Test(timeout = 120000)
public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException, ExecutionException {
- final StandardProcessScheduler scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateManagerProvider, niFiProperties);
+ final StandardProcessScheduler scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), Mockito.mock(FlowController.class),
+ null, stateManagerProvider, niFiProperties);
+
for (int i = 0; i < 5000; i++) {
testEnableReferencingServicesGraph(scheduler);
}
}
+ private ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final ControllerServiceProvider serviceProvider) {
+ final ControllerServiceNode serviceNode = new ExtensionBuilder()
+ .identifier(id)
+ .type(type)
+ .bundleCoordinate(bundleCoordinate)
+ .controllerServiceProvider(serviceProvider)
+ .processScheduler(Mockito.mock(ProcessScheduler.class))
+ .nodeTypeProvider(Mockito.mock(NodeTypeProvider.class))
+ .validationTrigger(Mockito.mock(ValidationTrigger.class))
+ .reloadComponent(Mockito.mock(ReloadComponent.class))
+ .variableRegistry(variableRegistry)
+ .stateManagerProvider(Mockito.mock(StateManagerProvider.class))
+ .extensionManager(extensionManager)
+ .buildControllerService();
+
+ serviceProvider.onControllerServiceAdded(serviceNode);
+
+ return serviceNode;
+ }
+
public void testEnableReferencingServicesGraph(final StandardProcessScheduler scheduler) throws InterruptedException, ExecutionException {
final FlowController controller = Mockito.mock(FlowController.class);
+
final ProcessGroup procGroup = new MockProcessGroup(controller);
- Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ Mockito.when(controller.getFlowManager()).thenReturn(flowManager);
+
+ Mockito.when(flowManager.getGroup(Mockito.anyString())).thenReturn(procGroup);
Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
- final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
- stateManagerProvider, variableRegistry, niFiProperties, new SynchronousValidationTrigger());
+ final ControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(controller, scheduler, null);
// build a graph of controller services with dependencies as such:
//
@@ -122,14 +153,10 @@ public class StandardControllerServiceProviderIT {
// So we have to verify that if D is enabled, when we enable its referencing services,
// we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
// until B is first enabled so ensure that we enable B first.
- final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
+ final ControllerServiceNode serviceNode1 = createControllerService(ServiceA.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), serviceProvider);
+ final ControllerServiceNode serviceNode2 = createControllerService(ServiceA.class.getName(), "2", systemBundle.getBundleDetails().getCoordinate(), serviceProvider);
+ final ControllerServiceNode serviceNode3 = createControllerService(ServiceA.class.getName(), "3", systemBundle.getBundleDetails().getCoordinate(), serviceProvider);
+ final ControllerServiceNode serviceNode4 = createControllerService(ServiceB.class.getName(), "4", systemBundle.getBundleDetails().getCoordinate(), serviceProvider);
procGroup.addControllerService(serviceNode1);
procGroup.addControllerService(serviceNode2);
@@ -141,8 +168,9 @@ public class StandardControllerServiceProviderIT {
setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE_2.getName(), "4");
- provider.enableControllerService(serviceNode4).get();
- provider.enableReferencingServices(serviceNode4);
+ serviceNode4.performValidation();
+ serviceProvider.enableControllerService(serviceNode4).get();
+ serviceProvider.enableReferencingServices(serviceNode4);
// Verify that the services are either ENABLING or ENABLED, and wait for all of them to become ENABLED.
// Note that we set a timeout of 10 seconds, in case a bug occurs and the services never become ENABLED.
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
index 0a0b05f..f70ce6e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
@@ -17,10 +17,15 @@
package org.apache.nifi.controller.service;
import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ExtensionBuilder;
import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
import org.apache.nifi.nar.SystemBundle;
@@ -28,7 +33,6 @@ import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.SynchronousValidationTrigger;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -37,6 +41,7 @@ import org.mockito.Mockito;
import java.util.Collections;
+
public class StandardControllerServiceProviderTest {
private ControllerService proxied;
@@ -48,7 +53,7 @@ public class StandardControllerServiceProviderTest {
private static FlowController flowController;
@BeforeClass
- public static void setupSuite() throws Exception {
+ public static void setupSuite() {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, StandardControllerServiceProviderTest.class.getResource("/conf/nifi.properties").getFile());
nifiProperties = NiFiProperties.createBasicNiFiProperties(null, null);
@@ -63,37 +68,35 @@ public class StandardControllerServiceProviderTest {
Mockito.when(flowController.getExtensionManager()).thenReturn(extensionManager);
}
+
@Before
public void setup() throws Exception {
String id = "id";
String clazz = "org.apache.nifi.controller.service.util.TestControllerService";
- ControllerServiceProvider provider = new StandardControllerServiceProvider(flowController, null, null, new StateManagerProvider() {
- @Override
- public StateManager getStateManager(final String componentId) {
- return Mockito.mock(StateManager.class);
- }
-
- @Override
- public void shutdown() {
- }
-
- @Override
- public void enableClusterProvider() {
- }
-
- @Override
- public void disableClusterProvider() {
- }
-
- @Override
- public void onComponentRemoved(String componentId) {
- }
- }, variableRegistry, nifiProperties, new SynchronousValidationTrigger());
- ControllerServiceNode node = provider.createControllerService(clazz, id, systemBundle.getBundleDetails().getCoordinate(), null, true);
+ ControllerServiceProvider provider = new StandardControllerServiceProvider(Mockito.mock(FlowController.class), null, null);
+ ControllerServiceNode node = createControllerService(clazz, id, systemBundle.getBundleDetails().getCoordinate(), provider);
proxied = node.getProxiedControllerService();
implementation = node.getControllerServiceImplementation();
}
+ private ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final ControllerServiceProvider serviceProvider) {
+ final ControllerServiceNode serviceNode = new ExtensionBuilder()
+ .identifier(id)
+ .type(type)
+ .bundleCoordinate(bundleCoordinate)
+ .controllerServiceProvider(serviceProvider)
+ .processScheduler(Mockito.mock(ProcessScheduler.class))
+ .nodeTypeProvider(Mockito.mock(NodeTypeProvider.class))
+ .validationTrigger(Mockito.mock(ValidationTrigger.class))
+ .reloadComponent(Mockito.mock(ReloadComponent.class))
+ .variableRegistry(variableRegistry)
+ .stateManagerProvider(Mockito.mock(StateManagerProvider.class))
+ .extensionManager(extensionManager)
+ .buildControllerService();
+
+ return serviceNode;
+ }
+
@Test(expected = UnsupportedOperationException.class)
public void testCallProxiedOnPropertyModified() {
proxied.onPropertyModified(null, "oldValue", "newValue");