You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/19 19:30:54 UTC
[4/5] incubator-nifi git commit: NIFI-250: Refactoring of controller
service states
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index fbd3dd7..66abf30 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -16,14 +16,27 @@
*/
package org.apache.nifi.controller.service;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.UUID;
import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.StandardProcessorNode;
+import org.apache.nifi.controller.service.mock.DummyProcessor;
import org.apache.nifi.controller.service.mock.ServiceA;
import org.apache.nifi.controller.service.mock.ServiceB;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.StandardProcessorInitializationContext;
+import org.apache.nifi.processor.StandardValidationContextFactory;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestStandardControllerServiceProvider {
@@ -68,11 +81,11 @@ public class TestStandardControllerServiceProvider {
@Test
- public void testActivateReferencingComponentsGraph() {
+ public void testEnableReferencingServicesGraph() {
final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler);
- // build a graph of components with dependencies as such:
+ // build a graph of controller services with dependencies as such:
//
// A -> B -> D
// C ---^----^
@@ -81,7 +94,7 @@ public class TestStandardControllerServiceProvider {
// AND
// C references B and D.
//
- // So we have to verify that if D is enabled, when we enable its referencing components,
+ // So we have to verify that if D is enabled, when we enable its referencing services,
// we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
// until B is first enabled so ensure that we enable B first.
@@ -96,12 +109,102 @@ public class TestStandardControllerServiceProvider {
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
provider.enableControllerService(serviceNode4);
- provider.activateReferencingComponents(serviceNode4);
-
- assertFalse(serviceNode3.isDisabled());
- assertFalse(serviceNode2.isDisabled());
- assertFalse(serviceNode1.isDisabled());
+ provider.enableReferencingServices(serviceNode4);
+ assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
}
+
+ @Test
+ public void testStartStopReferencingComponents() {
+ final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+ final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler);
+
+ // build a graph of reporting tasks and controller services with dependencies as such:
+ //
+ // Processor P1 -> A -> B -> D
+ // Processor P2 -> C ---^----^
+ //
+ // In other words, Processor P1 references Controller Service A, which references B, which references D.
+ // AND
+ // Processor P2 references Controller Service C, which references B and D.
+ //
+ // So we have to verify that if D is enabled, when we enable its referencing services,
+ // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
+ // until B is first enabled so ensure that we enable B first.
+
+ final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
+ final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
+ final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
+ final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
+
+ final ProcessGroup mockProcessGroup = Mockito.mock(ProcessGroup.class);
+ Mockito.doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0];
+ procNode.verifyCanStart();
+ procNode.setScheduledState(ScheduledState.RUNNING);
+ return null;
+ }
+ }).when(mockProcessGroup).startProcessor(Mockito.any(ProcessorNode.class));
+
+ Mockito.doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0];
+ procNode.verifyCanStop();
+ procNode.setScheduledState(ScheduledState.STOPPED);
+ return null;
+ }
+ }).when(mockProcessGroup).stopProcessor(Mockito.any(ProcessorNode.class));
+
+ final String id1 = UUID.randomUUID().toString();
+ final ProcessorNode procNodeA = new StandardProcessorNode(new DummyProcessor(), id1,
+ new StandardValidationContextFactory(provider), scheduler, provider);
+ procNodeA.getProcessor().initialize(new StandardProcessorInitializationContext(id1, null, provider));
+ procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1");
+ procNodeA.setProcessGroup(mockProcessGroup);
+
+ final String id2 = UUID.randomUUID().toString();
+ final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(),id2,
+ new StandardValidationContextFactory(provider), scheduler, provider);
+ procNodeB.getProcessor().initialize(new StandardProcessorInitializationContext(id2, null, provider));
+ procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3");
+ procNodeB.setProcessGroup(mockProcessGroup);
+
+ serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+ serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
+ serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+ serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
+
+ provider.enableControllerService(serviceNode4);
+ provider.enableReferencingServices(serviceNode4);
+ provider.scheduleReferencingComponents(serviceNode4);
+
+ assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
+ assertTrue(procNodeA.isRunning());
+ assertTrue(procNodeB.isRunning());
+
+ // stop processors and verify results.
+ provider.unscheduleReferencingComponents(serviceNode4);
+ assertFalse(procNodeA.isRunning());
+ assertFalse(procNodeB.isRunning());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
+
+ provider.disableReferencingServices(serviceNode4);
+ assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
+ assertEquals(ControllerServiceState.ENABLED, serviceNode4.getState());
+
+ provider.disableControllerService(serviceNode4);
+ assertEquals(ControllerServiceState.DISABLED, serviceNode4.getState());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
new file mode 100644
index 0000000..615e172
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service.mock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public class DummyProcessor extends AbstractProcessor {
+
+ public static final PropertyDescriptor SERVICE = new PropertyDescriptor.Builder()
+ .name("Controller Service")
+ .identifiesControllerService(ControllerService.class)
+ .required(true)
+ .build();
+
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(SERVICE);
+ return descriptors;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index c55960f..9c2f0e0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -146,9 +146,9 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
if (enabled != null) {
if (enabled) {
-
+ serviceProvider.enableReferencingServices(controllerService);
} else {
-
+ serviceProvider.disableReferencingServices(controllerService);
}
} else if (state != null) {
try {
@@ -156,13 +156,14 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
switch (scheduledState) {
case RUNNING:
-
+ serviceProvider.scheduleReferencingComponents(controllerService);
break;
case STOPPED:
-
+ serviceProvider.unscheduleReferencingComponents(controllerService);
break;
- default:
-
+ default:
+ throw new IllegalArgumentException(String.format(
+ "The specified state (%s) is not valid. Valid options are 'RUNNING' and 'STOPPED'.", state));
}
} catch (IllegalArgumentException iae) {
throw new IllegalArgumentException(String.format(