You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/11/06 16:24:03 UTC
[2/9] nifi git commit: NIFI-5769: Refactored FlowController to use
Composition over Inheritance - Ensure that when root group is set,
that we register its ID in FlowManager
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index d317000..daee3c8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -18,14 +18,20 @@
package org.apache.nifi.controller.service;
import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.controller.ExtensionBuilder;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.LoggableComponent;
+import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.StandardProcessorNode;
+import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.service.mock.DummyProcessor;
import org.apache.nifi.controller.service.mock.MockProcessGroup;
@@ -116,6 +122,9 @@ public class TestStandardControllerServiceProvider {
controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ Mockito.when(controller.getFlowManager()).thenReturn(flowManager);
+
final ConcurrentMap<String, ProcessorNode> processorMap = new ConcurrentHashMap<>();
Mockito.doAnswer(new Answer<ProcessorNode>() {
@Override
@@ -123,7 +132,7 @@ public class TestStandardControllerServiceProvider {
final String id = invocation.getArgumentAt(0, String.class);
return processorMap.get(id);
}
- }).when(controller).getProcessorNode(Mockito.anyString());
+ }).when(flowManager).getProcessorNode(Mockito.anyString());
Mockito.doAnswer(new Answer<Object>() {
@Override
@@ -132,11 +141,12 @@ public class TestStandardControllerServiceProvider {
processorMap.putIfAbsent(procNode.getIdentifier(), procNode);
return null;
}
- }).when(controller).onProcessorAdded(Mockito.any(ProcessorNode.class));
+ }).when(flowManager).onProcessorAdded(Mockito.any(ProcessorNode.class));
}
private StandardProcessScheduler createScheduler() {
- return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateManagerProvider, niFiProperties);
+ return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), Mockito.mock(FlowController.class),
+ null, stateManagerProvider, niFiProperties);
}
private void setProperty(ControllerServiceNode serviceNode, String propName, String propValue) {
@@ -145,19 +155,42 @@ public class TestStandardControllerServiceProvider {
serviceNode.setProperties(props);
}
+
+ private ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final ControllerServiceProvider serviceProvider) {
+ final ControllerServiceNode serviceNode = new ExtensionBuilder()
+ .identifier(id)
+ .type(type)
+ .bundleCoordinate(bundleCoordinate)
+ .controllerServiceProvider(serviceProvider)
+ .processScheduler(Mockito.mock(ProcessScheduler.class))
+ .nodeTypeProvider(Mockito.mock(NodeTypeProvider.class))
+ .validationTrigger(Mockito.mock(ValidationTrigger.class))
+ .reloadComponent(Mockito.mock(ReloadComponent.class))
+ .variableRegistry(variableRegistry)
+ .stateManagerProvider(Mockito.mock(StateManagerProvider.class))
+ .extensionManager(extensionManager)
+ .buildControllerService();
+
+ serviceProvider.onControllerServiceAdded(serviceNode);
+
+ return serviceNode;
+ }
+
+
@Test
public void testDisableControllerService() {
final ProcessGroup procGroup = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
- Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ Mockito.when(controller.getFlowManager()).thenReturn(flowManager);
+
+ Mockito.when(flowManager.getGroup(Mockito.anyString())).thenReturn(procGroup);
Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
final StandardProcessScheduler scheduler = createScheduler();
- final StandardControllerServiceProvider provider =
- new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties, new SynchronousValidationTrigger());
+ final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null);
- final ControllerServiceNode serviceNode = provider.createControllerService(ServiceB.class.getName(), "B",
- systemBundle.getBundleDetails().getCoordinate(), null,false);
+ final ControllerServiceNode serviceNode = createControllerService(ServiceB.class.getName(), "B", systemBundle.getBundleDetails().getCoordinate(), provider);
serviceNode.performValidation();
serviceNode.getValidationStatus(5, TimeUnit.SECONDS);
provider.enableControllerService(serviceNode);
@@ -168,17 +201,19 @@ public class TestStandardControllerServiceProvider {
public void testEnableDisableWithReference() {
final ProcessGroup group = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
- Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(group);
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ Mockito.when(controller.getFlowManager()).thenReturn(flowManager);
+
+ Mockito.when(flowManager.getGroup(Mockito.anyString())).thenReturn(group);
Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
final StandardProcessScheduler scheduler = createScheduler();
- final StandardControllerServiceProvider provider =
- new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties, new SynchronousValidationTrigger());
+ final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null);
+
+ Mockito.when(controller.getControllerServiceProvider()).thenReturn(provider);
- final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
+ final ControllerServiceNode serviceNodeB = createControllerService(ServiceB.class.getName(), "B", systemBundle.getBundleDetails().getCoordinate(), provider);
+ final ControllerServiceNode serviceNodeA = createControllerService(ServiceA.class.getName(), "A", systemBundle.getBundleDetails().getCoordinate(), provider);
group.addControllerService(serviceNodeA);
group.addControllerService(serviceNodeB);
@@ -224,15 +259,17 @@ public class TestStandardControllerServiceProvider {
public void testOrderingOfServices() {
final ProcessGroup procGroup = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
- Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
+
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ Mockito.when(controller.getFlowManager()).thenReturn(flowManager);
+
+ Mockito.when(flowManager.getGroup(Mockito.anyString())).thenReturn(procGroup);
Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
final StandardControllerServiceProvider provider =
- new StandardControllerServiceProvider(controller, null, null, stateManagerProvider, variableRegistry, niFiProperties, new SynchronousValidationTrigger());
- final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceB.class.getName(), "2",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
+ new StandardControllerServiceProvider(controller, null, null);
+ final ControllerServiceNode serviceNode1 = createControllerService(ServiceA.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), provider);
+ final ControllerServiceNode serviceNode2 = createControllerService(ServiceB.class.getName(), "2", systemBundle.getBundleDetails().getCoordinate(), provider);
setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2");
@@ -290,8 +327,7 @@ public class TestStandardControllerServiceProvider {
// But we want to ensure that the method returns successfully without throwing a StackOverflowException or anything
// like that.
nodeMap.clear();
- final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
+ final ControllerServiceNode serviceNode3 = createControllerService(ServiceA.class.getName(), "3", systemBundle.getBundleDetails().getCoordinate(), provider);
setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "3");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "1");
nodeMap.put("1", serviceNode1);
@@ -316,10 +352,8 @@ public class TestStandardControllerServiceProvider {
// Add multiple completely disparate branches.
nodeMap.clear();
setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2");
- final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- final ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceB.class.getName(), "5",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
+ final ControllerServiceNode serviceNode4 = createControllerService(ServiceB.class.getName(), "4", systemBundle.getBundleDetails().getCoordinate(), provider);
+ final ControllerServiceNode serviceNode5 = createControllerService(ServiceB.class.getName(), "5", systemBundle.getBundleDetails().getCoordinate(), provider);
setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "4");
nodeMap.put("1", serviceNode1);
nodeMap.put("2", serviceNode2);
@@ -385,10 +419,14 @@ public class TestStandardControllerServiceProvider {
final LoggableComponent<Processor> dummyProcessor = new LoggableComponent<>(processor, systemBundle.getBundleDetails().getCoordinate(), null);
final ProcessorNode procNode = new StandardProcessorNode(dummyProcessor, mockInitContext.getIdentifier(),
- new StandardValidationContextFactory(serviceProvider, null), scheduler, serviceProvider, niFiProperties,
- new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
+ new StandardValidationContextFactory(serviceProvider, null), scheduler, serviceProvider,
+ new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
+
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ final FlowController flowController = Mockito.mock(FlowController.class );
+ Mockito.when(flowController.getFlowManager()).thenReturn(flowManager);
- final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, Mockito.mock(FlowController.class),
+ final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, flowController,
new MutableVariableRegistry(variableRegistry));
group.addProcessor(procNode);
procNode.setProcessGroup(group);
@@ -400,14 +438,16 @@ public class TestStandardControllerServiceProvider {
public void testEnableReferencingComponents() {
final ProcessGroup procGroup = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
- Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
+
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ Mockito.when(controller.getFlowManager()).thenReturn(flowManager);
+
+ Mockito.when(flowManager.getGroup(Mockito.anyString())).thenReturn(procGroup);
Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
final StandardProcessScheduler scheduler = createScheduler();
- final StandardControllerServiceProvider provider =
- new StandardControllerServiceProvider(controller, null, null, stateManagerProvider, variableRegistry, niFiProperties, new SynchronousValidationTrigger());
- final ControllerServiceNode serviceNode = provider.createControllerService(ServiceA.class.getName(), "1",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
+ final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, null, null);
+ final ControllerServiceNode serviceNode = createControllerService(ServiceA.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), provider);
final ProcessorNode procNode = createProcessor(scheduler, provider);
serviceNode.addReference(procNode);
@@ -423,26 +463,24 @@ public class TestStandardControllerServiceProvider {
@Test
public void validateEnableServices() {
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+
StandardProcessScheduler scheduler = createScheduler();
FlowController controller = Mockito.mock(FlowController.class);
- StandardControllerServiceProvider provider =
- new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties, new SynchronousValidationTrigger());
+ Mockito.when(controller.getFlowManager()).thenReturn(flowManager);
+
+ StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null);
ProcessGroup procGroup = new MockProcessGroup(controller);
- Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
+
+ Mockito.when(flowManager.getGroup(Mockito.anyString())).thenReturn(procGroup);
Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
- ControllerServiceNode A = provider.createControllerService(ServiceA.class.getName(), "A",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode B = provider.createControllerService(ServiceA.class.getName(), "B",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode C = provider.createControllerService(ServiceA.class.getName(), "C",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode D = provider.createControllerService(ServiceB.class.getName(), "D",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode E = provider.createControllerService(ServiceA.class.getName(), "E",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode F = provider.createControllerService(ServiceB.class.getName(), "F",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
+ ControllerServiceNode A = createControllerService(ServiceA.class.getName(), "A", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode B = createControllerService(ServiceA.class.getName(), "B", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode C = createControllerService(ServiceA.class.getName(), "C", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode D = createControllerService(ServiceB.class.getName(), "D", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode E = createControllerService(ServiceA.class.getName(), "E", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode F = createControllerService(ServiceB.class.getName(), "F", systemBundle.getBundleDetails().getCoordinate(), provider);
procGroup.addControllerService(A);
procGroup.addControllerService(B);
@@ -477,24 +515,23 @@ public class TestStandardControllerServiceProvider {
*/
@Test
public void validateEnableServices2() {
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+
StandardProcessScheduler scheduler = createScheduler();
FlowController controller = Mockito.mock(FlowController.class);
- StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
- stateManagerProvider, variableRegistry, niFiProperties, new SynchronousValidationTrigger());
+ Mockito.when(controller.getFlowManager()).thenReturn(flowManager);
+
+ StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null);
ProcessGroup procGroup = new MockProcessGroup(controller);
- Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
+
+ Mockito.when(flowManager.getGroup(Mockito.anyString())).thenReturn(procGroup);
Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
- ControllerServiceNode A = provider.createControllerService(ServiceC.class.getName(), "A",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode B = provider.createControllerService(ServiceA.class.getName(), "B",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode C = provider.createControllerService(ServiceB.class.getName(), "C",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode D = provider.createControllerService(ServiceA.class.getName(), "D",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode F = provider.createControllerService(ServiceA.class.getName(), "F",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
+ ControllerServiceNode A = createControllerService(ServiceC.class.getName(), "A", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode B = createControllerService(ServiceA.class.getName(), "B", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode C = createControllerService(ServiceB.class.getName(), "C", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode D = createControllerService(ServiceA.class.getName(), "D", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode F = createControllerService(ServiceA.class.getName(), "F", systemBundle.getBundleDetails().getCoordinate(), provider);
procGroup.addControllerService(A);
procGroup.addControllerService(B);
@@ -510,7 +547,7 @@ public class TestStandardControllerServiceProvider {
setProperty(D, ServiceA.OTHER_SERVICE.getName(), "C");
final List<ControllerServiceNode> services = Arrays.asList(C, F, A, B, D);
- services.stream().forEach(ControllerServiceNode::performValidation);
+ services.forEach(ControllerServiceNode::performValidation);
provider.enableControllerServices(services);
@@ -523,28 +560,25 @@ public class TestStandardControllerServiceProvider {
@Test
public void validateEnableServicesWithDisabledMissingService() {
+ final FlowManager flowManager = Mockito.mock(FlowManager.class);
+
StandardProcessScheduler scheduler = createScheduler();
FlowController controller = Mockito.mock(FlowController.class);
- StandardControllerServiceProvider provider =
- new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties, new SynchronousValidationTrigger());
+ Mockito.when(controller.getFlowManager()).thenReturn(flowManager);
+
+ StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null);
ProcessGroup procGroup = new MockProcessGroup(controller);
- Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
+
+ Mockito.when(flowManager.getGroup(Mockito.anyString())).thenReturn(procGroup);
Mockito.when(controller.getExtensionManager()).thenReturn(extensionManager);
- ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceA.class.getName(), "5",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode serviceNode6 = provider.createControllerService(ServiceB.class.getName(), "6",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
- ControllerServiceNode serviceNode7 = provider.createControllerService(ServiceC.class.getName(), "7",
- systemBundle.getBundleDetails().getCoordinate(), null, false);
+ ControllerServiceNode serviceNode1 = createControllerService(ServiceA.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode serviceNode2 = createControllerService(ServiceA.class.getName(), "2", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode serviceNode3 = createControllerService(ServiceA.class.getName(), "3", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode serviceNode4 = createControllerService(ServiceB.class.getName(), "4", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode serviceNode5 = createControllerService(ServiceA.class.getName(), "5", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode serviceNode6 = createControllerService(ServiceB.class.getName(), "6", systemBundle.getBundleDetails().getCoordinate(), provider);
+ ControllerServiceNode serviceNode7 = createControllerService(ServiceC.class.getName(), "7", systemBundle.getBundleDetails().getCoordinate(), provider);
procGroup.addControllerService(serviceNode1);
procGroup.addControllerService(serviceNode2);
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index ec2caef..9387269 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -277,13 +277,17 @@ public class MockProcessGroup implements ProcessGroup {
public void addProcessor(final ProcessorNode processor) {
processor.setProcessGroup(this);
processorMap.put(processor.getIdentifier(), processor);
- flowController.onProcessorAdded(processor);
+ if (flowController.getFlowManager() != null) {
+ flowController.getFlowManager().onProcessorAdded(processor);
+ }
}
@Override
public void removeProcessor(final ProcessorNode processor) {
processorMap.remove(processor.getIdentifier());
- flowController.onProcessorRemoved(processor);
+ if (flowController.getFlowManager() != null) {
+ flowController.getFlowManager().onProcessorRemoved(processor);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
index e7a3d87..5ca7601 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
@@ -428,7 +428,7 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
@Override
public void closeURLClassLoader(final String instanceIdentifier, final ClassLoader classLoader) {
- if (classLoader != null && (classLoader instanceof URLClassLoader)) {
+ if ((classLoader instanceof URLClassLoader)) {
final URLClassLoader urlClassLoader = (URLClassLoader) classLoader;
try {
urlClassLoader.close();
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index b9cbbd0..dcc8493 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -34,8 +34,8 @@ import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.components.ConfigurableComponent;
-import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
@@ -474,7 +474,7 @@ public class VersionsResource extends ApplicationResource {
}
// ensure we're not attempting to version the root group
- final ProcessGroupEntity root = serviceFacade.getProcessGroup(FlowController.ROOT_GROUP_ID_ALIAS);
+ final ProcessGroupEntity root = serviceFacade.getProcessGroup(FlowManager.ROOT_GROUP_ID_ALIAS);
if (root.getId().equals(groupId)) {
throw new IllegalArgumentException("The Root Process Group cannot be versioned.");
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 562d6bc..fd21240 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -3276,7 +3276,8 @@ public final class DtoFactory {
procDiagnostics.setProcessorStatus(createProcessorStatusDto(procStatus));
procDiagnostics.setThreadDumps(createThreadDumpDtos(procNode));
- final Set<ControllerServiceDiagnosticsDTO> referencedServiceDiagnostics = createReferencedServiceDiagnostics(procNode.getProperties(), flowController, serviceEntityFactory);
+ final Set<ControllerServiceDiagnosticsDTO> referencedServiceDiagnostics = createReferencedServiceDiagnostics(procNode.getProperties(),
+ flowController.getControllerServiceProvider(), serviceEntityFactory);
procDiagnostics.setReferencedControllerServices(referencedServiceDiagnostics);
return procDiagnostics;
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 88b13a0..745325b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -45,6 +45,7 @@ import org.apache.nifi.controller.FlowController.GroupStatusCounts;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.claim.ContentDirection;
@@ -116,7 +117,6 @@ import java.text.Collator;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
@@ -133,8 +133,6 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS;
-
public class ControllerFacade implements Authorizable {
private static final Logger logger = LoggerFactory.getLogger(ControllerFacade.class);
@@ -150,6 +148,10 @@ public class ControllerFacade implements Authorizable {
private VariableRegistry variableRegistry;
private ControllerSearchService controllerSearchService;
+ private ProcessGroup getRootGroup() {
+ return flowController.getFlowManager().getRootGroup();
+ }
+
/**
* Returns the group id that contains the specified processor.
*
@@ -157,7 +159,7 @@ public class ControllerFacade implements Authorizable {
* @return group id
*/
public String findProcessGroupIdForProcessor(String processorId) {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = getRootGroup();
final ProcessorNode processor = rootGroup.findProcessor(processorId);
if (processor == null) {
return null;
@@ -167,11 +169,11 @@ public class ControllerFacade implements Authorizable {
}
public Connectable findLocalConnectable(String componentId) {
- return flowController.findLocalConnectable(componentId);
+ return flowController.getFlowManager().findConnectable(componentId);
}
public ControllerServiceProvider getControllerServiceProvider() {
- return flowController;
+ return flowController.getControllerServiceProvider();
}
public ExtensionManager getExtensionManager() {
@@ -184,7 +186,7 @@ public class ControllerFacade implements Authorizable {
* @param name name
*/
public void setName(String name) {
- flowController.setName(name);
+ getRootGroup().setName(name);
}
@Override
@@ -203,7 +205,7 @@ public class ControllerFacade implements Authorizable {
* @param comments comments
*/
public void setComments(String comments) {
- flowController.setComments(comments);
+ getRootGroup().setComments(comments);
}
/**
@@ -250,7 +252,7 @@ public class ControllerFacade implements Authorizable {
* @return group id
*/
public String getRootGroupId() {
- return flowController.getRootGroupId();
+ return flowController.getFlowManager().getRootGroupId();
}
/**
@@ -260,7 +262,7 @@ public class ControllerFacade implements Authorizable {
*/
public Set<RootGroupPort> getInputPorts() {
final Set<RootGroupPort> inputPorts = new HashSet<>();
- ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ ProcessGroup rootGroup = getRootGroup();
for (final Port port : rootGroup.getInputPorts()) {
if (port instanceof RootGroupPort) {
inputPorts.add((RootGroupPort) port);
@@ -276,7 +278,7 @@ public class ControllerFacade implements Authorizable {
*/
public Set<RootGroupPort> getOutputPorts() {
final Set<RootGroupPort> outputPorts = new HashSet<>();
- ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ ProcessGroup rootGroup = getRootGroup();
for (final Port port : rootGroup.getOutputPorts()) {
if (port instanceof RootGroupPort) {
outputPorts.add((RootGroupPort) port);
@@ -292,7 +294,7 @@ public class ControllerFacade implements Authorizable {
* @return status history
*/
public StatusHistoryDTO getProcessorStatusHistory(final String processorId) {
- final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup root = getRootGroup();
final ProcessorNode processor = root.findProcessor(processorId);
// ensure the processor was found
@@ -320,7 +322,7 @@ public class ControllerFacade implements Authorizable {
* @return status history
*/
public StatusHistoryDTO getConnectionStatusHistory(final String connectionId) {
- final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup root = getRootGroup();
final Connection connection = root.findConnection(connectionId);
// ensure the connection was found
@@ -347,8 +349,10 @@ public class ControllerFacade implements Authorizable {
* @return status history
*/
public StatusHistoryDTO getProcessGroupStatusHistory(final String groupId) {
- final String searchId = groupId.equals(ROOT_GROUP_ID_ALIAS) ? flowController.getRootGroupId() : groupId;
- final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
+ final FlowManager flowManager = flowController.getFlowManager();
+
+ final String searchId = groupId.equals(FlowManager.ROOT_GROUP_ID_ALIAS) ? flowManager.getRootGroupId() : groupId;
+ final ProcessGroup root = flowManager.getRootGroup();
final ProcessGroup group = root.findProcessGroup(searchId);
// ensure the processor was found
@@ -373,7 +377,7 @@ public class ControllerFacade implements Authorizable {
* @return status history
*/
public StatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remoteProcessGroupId) {
- final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup root = getRootGroup();
final RemoteProcessGroup remoteProcessGroup = root.findRemoteProcessGroup(remoteProcessGroupId);
// ensure the output port was found
@@ -414,7 +418,7 @@ public class ControllerFacade implements Authorizable {
* @return name
*/
public String getName() {
- return flowController.getName();
+ return getRootGroup().getName();
}
public String getInstanceId() {
@@ -427,7 +431,7 @@ public class ControllerFacade implements Authorizable {
* @return comments
*/
public String getComments() {
- return flowController.getComments();
+ return getRootGroup().getComments();
}
/**
@@ -580,7 +584,7 @@ public class ControllerFacade implements Authorizable {
* @return the status of this controller
*/
public ControllerStatusDTO getControllerStatus() {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = getRootGroup();
final GroupStatusCounts groupStatusCounts = flowController.getGroupStatusCounts(rootGroup);
final ControllerStatusDTO controllerStatus = new ControllerStatusDTO();
@@ -624,7 +628,7 @@ public class ControllerFacade implements Authorizable {
* @return the status for the specified process group
*/
public ProcessGroupStatus getProcessGroupStatus(final String groupId, final int recursiveStatusDepth) {
- final ProcessGroupStatus processGroupStatus = flowController.getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), recursiveStatusDepth);
+ final ProcessGroupStatus processGroupStatus = flowController.getEventAccess().getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), recursiveStatusDepth);
if (processGroupStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
@@ -639,7 +643,7 @@ public class ControllerFacade implements Authorizable {
* @return the status for the specified processor
*/
public ProcessorStatus getProcessorStatus(final String processorId) {
- final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup root = getRootGroup();
final ProcessorNode processor = root.findProcessor(processorId);
// ensure the processor was found
@@ -649,7 +653,7 @@ public class ControllerFacade implements Authorizable {
// calculate the process group status
final String groupId = processor.getProcessGroup().getIdentifier();
- final ProcessGroupStatus processGroupStatus = flowController.getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1);
+ final ProcessGroupStatus processGroupStatus = flowController.getEventAccess().getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1);
if (processGroupStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
@@ -669,7 +673,7 @@ public class ControllerFacade implements Authorizable {
* @return the status for the specified connection
*/
public ConnectionStatus getConnectionStatus(final String connectionId) {
- final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup root = getRootGroup();
final Connection connection = root.findConnection(connectionId);
// ensure the connection was found
@@ -679,7 +683,7 @@ public class ControllerFacade implements Authorizable {
// calculate the process group status
final String groupId = connection.getProcessGroup().getIdentifier();
- final ProcessGroupStatus processGroupStatus = flowController.getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1);
+ final ProcessGroupStatus processGroupStatus = flowController.getEventAccess().getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1);
if (processGroupStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
@@ -699,7 +703,7 @@ public class ControllerFacade implements Authorizable {
* @return the status for the specified input port
*/
public PortStatus getInputPortStatus(final String portId) {
- final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup root = getRootGroup();
final Port port = root.findInputPort(portId);
// ensure the input port was found
@@ -708,7 +712,7 @@ public class ControllerFacade implements Authorizable {
}
final String groupId = port.getProcessGroup().getIdentifier();
- final ProcessGroupStatus processGroupStatus = flowController.getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1);
+ final ProcessGroupStatus processGroupStatus = flowController.getEventAccess().getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1);
if (processGroupStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
@@ -728,7 +732,7 @@ public class ControllerFacade implements Authorizable {
* @return the status for the specified output port
*/
public PortStatus getOutputPortStatus(final String portId) {
- final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup root = getRootGroup();
final Port port = root.findOutputPort(portId);
// ensure the output port was found
@@ -737,7 +741,7 @@ public class ControllerFacade implements Authorizable {
}
final String groupId = port.getProcessGroup().getIdentifier();
- final ProcessGroupStatus processGroupStatus = flowController.getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1);
+ final ProcessGroupStatus processGroupStatus = flowController.getEventAccess().getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1);
if (processGroupStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
@@ -757,7 +761,7 @@ public class ControllerFacade implements Authorizable {
* @return the status for the specified remote process group
*/
public RemoteProcessGroupStatus getRemoteProcessGroupStatus(final String remoteProcessGroupId) {
- final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup root = getRootGroup();
final RemoteProcessGroup remoteProcessGroup = root.findRemoteProcessGroup(remoteProcessGroupId);
// ensure the output port was found
@@ -766,7 +770,7 @@ public class ControllerFacade implements Authorizable {
}
final String groupId = remoteProcessGroup.getProcessGroup().getIdentifier();
- final ProcessGroupStatus groupStatus = flowController.getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1);
+ final ProcessGroupStatus groupStatus = flowController.getEventAccess().getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1);
if (groupStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
@@ -849,7 +853,7 @@ public class ControllerFacade implements Authorizable {
resources.add(ResourceFactory.getRestrictedComponentsResource());
Arrays.stream(RequiredPermission.values()).forEach(requiredPermission -> resources.add(ResourceFactory.getRestrictedComponentsResource(requiredPermission)));
- final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup root = getRootGroup();
// include the root group
final Resource rootResource = root.getResource();
@@ -929,7 +933,7 @@ public class ControllerFacade implements Authorizable {
resources.add(ResourceFactory.getOperationResource(controllerServiceResource));
};
- flowController.getAllControllerServices().forEach(csConsumer);
+ flowController.getFlowManager().getAllControllerServices().forEach(csConsumer);
root.findAllControllerServices().forEach(csConsumer);
@@ -1245,12 +1249,7 @@ public class ControllerFacade implements Authorizable {
}
// authorize the event
- final Authorizable dataAuthorizable;
- if (event.isRemotePortType()) {
- dataAuthorizable = flowController.createRemoteDataAuthorizable(event.getComponentId());
- } else {
- dataAuthorizable = flowController.createLocalDataAuthorizable(event.getComponentId());
- }
+ final Authorizable dataAuthorizable = getDataAuthorizable(event);
dataAuthorizable.authorize(authorizer, RequestAction.READ, user, attributes);
// get the filename and fall back to the identifier (should never happen)
@@ -1273,6 +1272,14 @@ public class ControllerFacade implements Authorizable {
}
}
+ private Authorizable getDataAuthorizable(final ProvenanceEventRecord event) {
+ if (event.isRemotePortType()) {
+ return flowController.getProvenanceAuthorizableFactory().createRemoteDataAuthorizable(event.getComponentId());
+ } else {
+ return flowController.getProvenanceAuthorizableFactory().createLocalDataAuthorizable(event.getComponentId());
+ }
+ }
+
/**
* Submits a replay request for the specified event id.
*
@@ -1320,12 +1327,7 @@ public class ControllerFacade implements Authorizable {
}
final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final Authorizable dataAuthorizable;
- if (event.isRemotePortType()) {
- dataAuthorizable = flowController.createRemoteDataAuthorizable(event.getComponentId());
- } else {
- dataAuthorizable = flowController.createLocalDataAuthorizable(event.getComponentId());
- }
+ final Authorizable dataAuthorizable = getDataAuthorizable(event);
final Map<String, String> eventAttributes = event.getAttributes();
@@ -1353,12 +1355,7 @@ public class ControllerFacade implements Authorizable {
}
final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final Authorizable dataAuthorizable;
- if (event.isRemotePortType()) {
- dataAuthorizable = flowController.createRemoteDataAuthorizable(event.getComponentId());
- } else {
- dataAuthorizable = flowController.createLocalDataAuthorizable(event.getComponentId());
- }
+ final Authorizable dataAuthorizable = getDataAuthorizable(event);
// ensure we can read and write the data
final Map<String, String> eventAttributes = event.getAttributes();
@@ -1373,12 +1370,7 @@ public class ControllerFacade implements Authorizable {
*/
private AuthorizationResult checkAuthorizationForData(ProvenanceEventRecord event) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final Authorizable dataAuthorizable;
- if (event.isRemotePortType()) {
- dataAuthorizable = flowController.createRemoteDataAuthorizable(event.getComponentId());
- } else {
- dataAuthorizable = flowController.createLocalDataAuthorizable(event.getComponentId());
- }
+ final Authorizable dataAuthorizable = getDataAuthorizable(event);
final Map<String, String> eventAttributes = event.getAttributes();
@@ -1525,12 +1517,12 @@ public class ControllerFacade implements Authorizable {
// parent uuids
final List<String> parentUuids = new ArrayList<>(event.getParentUuids());
- Collections.sort(parentUuids, Collator.getInstance(Locale.US));
+ parentUuids.sort(Collator.getInstance(Locale.US));
dto.setParentUuids(parentUuids);
// child uuids
final List<String> childUuids = new ArrayList<>(event.getChildUuids());
- Collections.sort(childUuids, Collator.getInstance(Locale.US));
+ childUuids.sort(Collator.getInstance(Locale.US));
dto.setChildUuids(childUuids);
}
@@ -1542,7 +1534,7 @@ public class ControllerFacade implements Authorizable {
private void setComponentDetails(final ProvenanceEventDTO dto) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup root = getRootGroup();
final Connectable connectable = findLocalConnectable(dto.getComponentId());
if (connectable != null) {
@@ -1584,7 +1576,7 @@ public class ControllerFacade implements Authorizable {
String name = connection.getName();
final Collection<Relationship> relationships = connection.getRelationships();
if (StringUtils.isBlank(name) && CollectionUtils.isNotEmpty(relationships)) {
- name = StringUtils.join(relationships.stream().map(relationship -> relationship.getName()).collect(Collectors.toSet()), ", ");
+ name = StringUtils.join(relationships.stream().map(Relationship::getName).collect(Collectors.toSet()), ", ");
}
dto.setComponentName(name);
} else {
@@ -1601,7 +1593,7 @@ public class ControllerFacade implements Authorizable {
* @return result
*/
public SearchResultsDTO search(final String search) {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = getRootGroup();
final SearchResultsDTO results = new SearchResultsDTO();
controllerSearchService.search(results, search, rootGroup);
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerSearchService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerSearchService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerSearchService.java
index f1a90c6..10e18c6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerSearchService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerSearchService.java
@@ -289,7 +289,7 @@ public class ControllerSearchService {
if (processor instanceof Searchable) {
final Searchable searchable = (Searchable) processor;
- final SearchContext context = new StandardSearchContext(searchStr, procNode, flowController, variableRegistry);
+ final SearchContext context = new StandardSearchContext(searchStr, procNode, flowController.getControllerServiceProvider(), variableRegistry);
// search the processor using the appropriate thread context classloader
try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/ComponentDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/ComponentDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/ComponentDAO.java
index aaec17a..078644a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/ComponentDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/ComponentDAO.java
@@ -64,7 +64,7 @@ public abstract class ComponentDAO {
* @return group
*/
protected ProcessGroup locateProcessGroup(FlowController flowController, String groupId) {
- ProcessGroup group = flowController.getGroup(groupId);
+ ProcessGroup group = flowController.getFlowManager().getGroup(groupId);
if (group == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
index 062f151..c3f5788 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
@@ -71,7 +71,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
private Authorizer authorizer;
private Connection locateConnection(final String connectionId) {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
final Connection connection = rootGroup.findConnection(connectionId);
if (connection == null) {
@@ -83,7 +83,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
@Override
public boolean hasConnection(String id) {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
return rootGroup.findConnection(id) != null;
}
@@ -159,7 +159,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
newPrioritizers = new ArrayList<>();
for (final String className : newPrioritizersClasses) {
try {
- newPrioritizers.add(flowController.createPrioritizer(className));
+ newPrioritizers.add(flowController.getFlowManager().createPrioritizer(className));
} catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e);
}
@@ -267,7 +267,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
public Connection createConnection(final String groupId, final ConnectionDTO connectionDTO) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
- if (isNotNull(connectionDTO.getParentGroupId()) && !flowController.areGroupsSame(connectionDTO.getParentGroupId(), groupId)) {
+ if (isNotNull(connectionDTO.getParentGroupId()) && !flowController.getFlowManager().areGroupsSame(connectionDTO.getParentGroupId(), groupId)) {
throw new IllegalStateException("Cannot specify a different Parent Group ID than the Group to which the Connection is being added");
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index 2995bae..3d09546 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -16,8 +16,6 @@
*/
package org.apache.nifi.web.dao.impl;
-import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS;
-
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.state.Scope;
@@ -27,6 +25,7 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.controller.exception.ValidationException;
+import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
@@ -80,9 +79,10 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
try {
// create the controller service
final ExtensionManager extensionManager = serviceProvider.getExtensionManager();
+ final FlowManager flowManager = flowController.getFlowManager();
final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
- final ControllerServiceNode controllerService = serviceProvider.createControllerService(
- controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate, Collections.emptySet(), true);
+ final ControllerServiceNode controllerService = flowManager.createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate,
+ Collections.emptySet(), true, true);
// ensure we can perform the update
verifyUpdate(controllerService, controllerServiceDTO);
@@ -92,13 +92,13 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
final String groupId = controllerServiceDTO.getParentGroupId();
if (groupId == null) {
- flowController.addRootControllerService(controllerService);
+ flowManager.addRootControllerService(controllerService);
} else {
final ProcessGroup group;
- if (groupId.equals(ROOT_GROUP_ID_ALIAS)) {
- group = flowController.getGroup(flowController.getRootGroupId());
+ if (groupId.equals(FlowManager.ROOT_GROUP_ID_ALIAS)) {
+ group = flowManager.getRootGroup();
} else {
- group = flowController.getGroup(flowController.getRootGroupId()).findProcessGroup(groupId);
+ group = flowManager.getRootGroup().findProcessGroup(groupId);
}
if (group == null) {
@@ -126,11 +126,13 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
@Override
public Set<ControllerServiceNode> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) {
+ final FlowManager flowManager = flowController.getFlowManager();
+
if (groupId == null) {
- return flowController.getRootControllerServices();
+ return flowManager.getRootControllerServices();
} else {
- final String searchId = groupId.equals(ROOT_GROUP_ID_ALIAS) ? flowController.getRootGroupId() : groupId;
- final ProcessGroup procGroup = flowController.getGroup(flowController.getRootGroupId()).findProcessGroup(searchId);
+ final String searchId = groupId.equals(FlowManager.ROOT_GROUP_ID_ALIAS) ? flowManager.getRootGroupId() : groupId;
+ final ProcessGroup procGroup = flowManager.getRootGroup().findProcessGroup(searchId);
if (procGroup == null) {
throw new ResourceNotFoundException("Could not find Process Group with ID " + groupId);
}
@@ -205,7 +207,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
// we need to use the property descriptors from the temp component here in case we are changing from a ghost component to a real component
final ConfigurableComponent tempComponent = extensionManager.getTempComponent(controllerService.getCanonicalClassName(), incomingCoordinate);
final Set<URL> additionalUrls = controllerService.getAdditionalClasspathResources(tempComponent.getPropertyDescriptors());
- flowController.reload(controllerService, controllerService.getCanonicalClassName(), incomingCoordinate, additionalUrls);
+ flowController.getReloadComponent().reload(controllerService, controllerService.getCanonicalClassName(), incomingCoordinate, additionalUrls);
} catch (ControllerServiceInstantiationException e) {
throw new NiFiCoreException(String.format("Unable to update controller service %s from %s to %s due to: %s",
controllerServiceDTO.getId(), controllerService.getBundleCoordinate().getCoordinate(), incomingCoordinate.getCoordinate(), e.getMessage()), e);
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java
index 60426c0..ddc5b4f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java
@@ -31,7 +31,7 @@ public class StandardFunnelDAO extends ComponentDAO implements FunnelDAO {
private FlowController flowController;
private Funnel locateFunnel(final String funnelId) {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
final Funnel funnel = rootGroup.findFunnel(funnelId);
if (funnel == null) {
@@ -43,13 +43,13 @@ public class StandardFunnelDAO extends ComponentDAO implements FunnelDAO {
@Override
public boolean hasFunnel(String funnelId) {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
return rootGroup.findFunnel(funnelId) != null;
}
@Override
public Funnel createFunnel(String groupId, FunnelDTO funnelDTO) {
- if (funnelDTO.getParentGroupId() != null && !flowController.areGroupsSame(groupId, funnelDTO.getParentGroupId())) {
+ if (funnelDTO.getParentGroupId() != null && !flowController.getFlowManager().areGroupsSame(groupId, funnelDTO.getParentGroupId())) {
throw new IllegalArgumentException("Cannot specify a different Parent Group ID than the Group to which the Funnel is being added.");
}
@@ -57,7 +57,7 @@ public class StandardFunnelDAO extends ComponentDAO implements FunnelDAO {
ProcessGroup group = locateProcessGroup(flowController, groupId);
// create the funnel
- Funnel funnel = flowController.createFunnel(funnelDTO.getId());
+ Funnel funnel = flowController.getFlowManager().createFunnel(funnelDTO.getId());
if (funnelDTO.getPosition() != null) {
funnel.setPosition(new Position(funnelDTO.getPosition().getX(), funnelDTO.getPosition().getY()));
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
index 2d47720..c08cb70 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
@@ -37,7 +37,7 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO {
private FlowController flowController;
private Port locatePort(final String portId) {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
Port port = rootGroup.findInputPort(portId);
if (port == null) {
@@ -53,13 +53,13 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO {
@Override
public boolean hasPort(String portId) {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
return rootGroup.findInputPort(portId) != null || rootGroup.findOutputPort(portId) != null;
}
@Override
public Port createPort(String groupId, PortDTO portDTO) {
- if (isNotNull(portDTO.getParentGroupId()) && !flowController.areGroupsSame(groupId, portDTO.getParentGroupId())) {
+ if (isNotNull(portDTO.getParentGroupId()) && !flowController.getFlowManager().areGroupsSame(groupId, portDTO.getParentGroupId())) {
throw new IllegalArgumentException("Cannot specify a different Parent Group ID than the Group to which the InputPort is being added.");
}
@@ -74,9 +74,9 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO {
// determine if this is the root group
Port port;
if (group.getParent() == null) {
- port = flowController.createRemoteInputPort(portDTO.getId(), portDTO.getName());
+ port = flowController.getFlowManager().createRemoteInputPort(portDTO.getId(), portDTO.getName());
} else {
- port = flowController.createLocalInputPort(portDTO.getId(), portDTO.getName());
+ port = flowController.getFlowManager().createLocalInputPort(portDTO.getId(), portDTO.getName());
}
// ensure we can perform the update before we add the processor to the flow
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java
index b8105e6..04430e8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java
@@ -32,7 +32,7 @@ public class StandardLabelDAO extends ComponentDAO implements LabelDAO {
private FlowController flowController;
private Label locateLabel(final String labelId) {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
final Label label = rootGroup.findLabel(labelId);
if (label == null) {
@@ -44,13 +44,13 @@ public class StandardLabelDAO extends ComponentDAO implements LabelDAO {
@Override
public boolean hasLabel(String labelId) {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
return rootGroup.findLabel(labelId) != null;
}
@Override
public Label createLabel(String groupId, LabelDTO labelDTO) {
- if (labelDTO.getParentGroupId() != null && !flowController.areGroupsSame(groupId, labelDTO.getParentGroupId())) {
+ if (labelDTO.getParentGroupId() != null && !flowController.getFlowManager().areGroupsSame(groupId, labelDTO.getParentGroupId())) {
throw new IllegalArgumentException("Cannot specify a different Parent Group ID than the Group to which the Label is being added.");
}
@@ -58,7 +58,7 @@ public class StandardLabelDAO extends ComponentDAO implements LabelDAO {
ProcessGroup group = locateProcessGroup(flowController, groupId);
// create the label
- Label label = flowController.createLabel(labelDTO.getId(), labelDTO.getLabel());
+ Label label = flowController.getFlowManager().createLabel(labelDTO.getId(), labelDTO.getLabel());
if (labelDTO.getPosition() != null) {
label.setPosition(new Position(labelDTO.getPosition().getX(), labelDTO.getPosition().getY()));
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java
index 72bc49b..f4eea8a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java
@@ -37,7 +37,7 @@ public class StandardOutputPortDAO extends ComponentDAO implements PortDAO {
private FlowController flowController;
private Port locatePort(final String portId) {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
final Port port = rootGroup.findOutputPort(portId);
if (port == null) {
@@ -49,13 +49,13 @@ public class StandardOutputPortDAO extends ComponentDAO implements PortDAO {
@Override
public boolean hasPort(String portId) {
- final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
return rootGroup.findOutputPort(portId) != null;
}
@Override
public Port createPort(String groupId, PortDTO portDTO) {
- if (isNotNull(portDTO.getParentGroupId()) && !flowController.areGroupsSame(groupId, portDTO.getParentGroupId())) {
+ if (isNotNull(portDTO.getParentGroupId()) && !flowController.getFlowManager().areGroupsSame(groupId, portDTO.getParentGroupId())) {
throw new IllegalArgumentException("Cannot specify a different Parent Group ID than the Group to which the OutputPort is being added.");
}
@@ -70,9 +70,9 @@ public class StandardOutputPortDAO extends ComponentDAO implements PortDAO {
// determine if this is the root group
Port port;
if (group.getParent() == null) {
- port = flowController.createRemoteOutputPort(portDTO.getId(), portDTO.getName());
+ port = flowController.getFlowManager().createRemoteOutputPort(portDTO.getId(), portDTO.getName());
} else {
- port = flowController.createLocalOutputPort(portDTO.getId(), portDTO.getName());
+ port = flowController.getFlowManager().createLocalOutputPort(portDTO.getId(), portDTO.getName());
}
// ensure we can perform the update before we add the processor to the flow
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index 47e9855..c2a180a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -22,6 +22,7 @@ import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
@@ -58,7 +59,8 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
@Override
public ProcessGroup createProcessGroup(String parentGroupId, ProcessGroupDTO processGroup) {
- if (processGroup.getParentGroupId() != null && !flowController.areGroupsSame(processGroup.getParentGroupId(), parentGroupId)) {
+ final FlowManager flowManager = flowController.getFlowManager();
+ if (processGroup.getParentGroupId() != null && !flowManager.areGroupsSame(processGroup.getParentGroupId(), parentGroupId)) {
throw new IllegalArgumentException("Cannot specify a different Parent Group ID than the Group to which the Process Group is being added.");
}
@@ -66,7 +68,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
ProcessGroup parentGroup = locateProcessGroup(flowController, parentGroupId);
// create the process group
- ProcessGroup group = flowController.createProcessGroup(processGroup.getId());
+ ProcessGroup group = flowManager.createProcessGroup(processGroup.getId());
if (processGroup.getName() != null) {
group.setName(processGroup.getName());
}
@@ -83,7 +85,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
@Override
public boolean hasProcessGroup(String groupId) {
- return flowController.getGroup(groupId) != null;
+ return flowController.getFlowManager().getGroup(groupId) != null;
}
@Override
@@ -156,8 +158,10 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
@Override
public void verifyActivateControllerServices(final ControllerServiceState state, final Collection<String> serviceIds) {
+ final FlowManager flowManager = flowController.getFlowManager();
+
final Set<ControllerServiceNode> serviceNodes = serviceIds.stream()
- .map(flowController::getControllerServiceNode)
+ .map(flowManager::getControllerServiceNode)
.collect(Collectors.toSet());
for (final ControllerServiceNode serviceNode : serviceNodes) {
@@ -174,7 +178,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
// We do this, rather than calling ProcessGroup.findLocalConnectable because for any component that is buried several
// layers of Process Groups deep, that method becomes quite a bit more expensive than this method, due to all of the
// Read Locks that must be obtained while recursing through the Process Group's descendant groups.
- final Connectable connectable = flowController.findLocalConnectable(componentId);
+ final Connectable connectable = flowController.getFlowManager().findConnectable(componentId);
if (connectable == null) {
throw new ResourceNotFoundException("Could not find Component with ID " + componentId);
}
@@ -283,14 +287,15 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
@Override
public Future<Void> activateControllerServices(final String groupId, final ControllerServiceState state, final Collection<String> serviceIds) {
+ final FlowManager flowManager = flowController.getFlowManager();
final List<ControllerServiceNode> serviceNodes = serviceIds.stream()
- .map(flowController::getControllerServiceNode)
+ .map(flowManager::getControllerServiceNode)
.collect(Collectors.toList());
if (state == ControllerServiceState.ENABLED) {
- return flowController.enableControllerServicesAsync(serviceNodes);
+ return flowController.getControllerServiceProvider().enableControllerServicesAsync(serviceNodes);
} else {
- return flowController.disableControllerServicesAsync(serviceNodes);
+ return flowController.getControllerServiceProvider().disableControllerServicesAsync(serviceNodes);
}
}
@@ -329,7 +334,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager());
- final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController, flowController.getFlowRegistryClient(), false);
+ final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController.getControllerServiceProvider(), flowController.getFlowRegistryClient(), false);
final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.registryName(registryName)
@@ -393,7 +398,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
@Override
public void verifyDeleteFlowRegistry(String registryId) {
- final ProcessGroup rootGroup = flowController.getRootGroup();
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
final VersionControlInformation versionControlInformation = rootGroup.getVersionControlInformation();
if (versionControlInformation != null && versionControlInformation.getRegistryIdentifier().equals(registryId)) {