You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/19 19:30:54 UTC

[4/5] incubator-nifi git commit: NIFI-250: Refactoring of controller service states

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index fbd3dd7..66abf30 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -16,14 +16,27 @@
  */
 package org.apache.nifi.controller.service;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.UUID;
 
 import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.StandardProcessorNode;
+import org.apache.nifi.controller.service.mock.DummyProcessor;
 import org.apache.nifi.controller.service.mock.ServiceA;
 import org.apache.nifi.controller.service.mock.ServiceB;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.StandardProcessorInitializationContext;
+import org.apache.nifi.processor.StandardValidationContextFactory;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestStandardControllerServiceProvider {
 
@@ -68,11 +81,11 @@ public class TestStandardControllerServiceProvider {
     
     
     @Test
-    public void testActivateReferencingComponentsGraph() {
+    public void testEnableReferencingServicesGraph() {
         final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
         final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler);
         
-        // build a graph of components with dependencies as such:
+        // build a graph of controller services with dependencies as such:
         //
         // A -> B -> D
         // C ---^----^
@@ -81,7 +94,7 @@ public class TestStandardControllerServiceProvider {
         // AND
         // C references B and D.
         //
-        // So we have to verify that if D is enabled, when we enable its referencing components,
+        // So we have to verify that if D is enabled, when we enable its referencing services,
         // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
         // until B is first enabled so ensure that we enable B first.
         
@@ -96,12 +109,102 @@ public class TestStandardControllerServiceProvider {
         serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
         
         provider.enableControllerService(serviceNode4);
-        provider.activateReferencingComponents(serviceNode4);
-        
-        assertFalse(serviceNode3.isDisabled());
-        assertFalse(serviceNode2.isDisabled());
-        assertFalse(serviceNode1.isDisabled());
+        provider.enableReferencingServices(serviceNode4);
         
+        assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
+        assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
+        assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
     }
     
+    
+    @Test
+    public void testStartStopReferencingComponents() {
+        final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+        final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler);
+        
+        // build a graph of reporting tasks and controller services with dependencies as such:
+        //
+        // Processor P1 -> A -> B -> D
+        // Processor P2 -> C ---^----^
+        //
+        // In other words, Processor P1 references Controller Service A, which references B, which references D.
+        // AND
+        // Processor P2 references Controller Service C, which references B and D.
+        //
+        // So we have to verify that if D is enabled, when we enable its referencing services,
+        // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
+        // until B is first enabled so ensure that we enable B first.
+        
+        final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
+        final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
+        final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
+        final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
+        
+        final ProcessGroup mockProcessGroup = Mockito.mock(ProcessGroup.class);
+        Mockito.doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0];
+                procNode.verifyCanStart();
+                procNode.setScheduledState(ScheduledState.RUNNING);
+                return null;
+            }
+        }).when(mockProcessGroup).startProcessor(Mockito.any(ProcessorNode.class));
+        
+        Mockito.doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable {
+                final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0];
+                procNode.verifyCanStop();
+                procNode.setScheduledState(ScheduledState.STOPPED);
+                return null;
+            }
+        }).when(mockProcessGroup).stopProcessor(Mockito.any(ProcessorNode.class));
+        
+        final String id1 = UUID.randomUUID().toString();
+        final ProcessorNode procNodeA = new StandardProcessorNode(new DummyProcessor(), id1,
+                new StandardValidationContextFactory(provider), scheduler, provider);
+        procNodeA.getProcessor().initialize(new StandardProcessorInitializationContext(id1, null, provider));
+        procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1");
+        procNodeA.setProcessGroup(mockProcessGroup);
+        
+        final String id2 = UUID.randomUUID().toString();
+        final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(),id2,
+                new StandardValidationContextFactory(provider), scheduler, provider);
+        procNodeB.getProcessor().initialize(new StandardProcessorInitializationContext(id2, null, provider));
+        procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3");
+        procNodeB.setProcessGroup(mockProcessGroup);
+        
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+        serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
+        
+        provider.enableControllerService(serviceNode4);
+        provider.enableReferencingServices(serviceNode4);
+        provider.scheduleReferencingComponents(serviceNode4);
+        
+        assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
+        assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
+        assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
+        assertTrue(procNodeA.isRunning());
+        assertTrue(procNodeB.isRunning());
+        
+        // stop processors and verify results.
+        provider.unscheduleReferencingComponents(serviceNode4);
+        assertFalse(procNodeA.isRunning());
+        assertFalse(procNodeB.isRunning());
+        assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
+        assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
+        assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
+        
+        provider.disableReferencingServices(serviceNode4);
+        assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
+        assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
+        assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
+        assertEquals(ControllerServiceState.ENABLED, serviceNode4.getState());
+        
+        provider.disableControllerService(serviceNode4);
+        assertEquals(ControllerServiceState.DISABLED, serviceNode4.getState());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
new file mode 100644
index 0000000..615e172
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service.mock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public class DummyProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor SERVICE = new PropertyDescriptor.Builder()
+        .name("Controller Service")
+        .identifiesControllerService(ControllerService.class)
+        .required(true)
+        .build();
+    
+    
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(SERVICE);
+        return descriptors;
+    }
+    
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index c55960f..9c2f0e0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -146,9 +146,9 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
         
         if (enabled != null) {
             if (enabled) {
-                
+                serviceProvider.enableReferencingServices(controllerService);
             } else {
-                
+                serviceProvider.disableReferencingServices(controllerService);
             }
         } else if (state != null) {
             try {
@@ -156,13 +156,14 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
                 
                 switch (scheduledState) {
                     case RUNNING:
-                        
+                        serviceProvider.scheduleReferencingComponents(controllerService);
                         break;
                     case STOPPED:
-                        
+                        serviceProvider.unscheduleReferencingComponents(controllerService);
                         break;
-                    default: 
-                        
+                    default:
+                        throw new IllegalArgumentException(String.format(
+                                "The specified state (%s) is not valid. Valid options are 'RUNNING' and 'STOPPED'.", state));
                 }
             } catch (IllegalArgumentException iae) {
                 throw new IllegalArgumentException(String.format(