You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/03/24 15:10:58 UTC
[11/17] nifi git commit: NIFI-3380 Bumping NAR plugin to
1.2.0-SNAPSHOT development to leverage changes from master,
adding buildnumber-maven-plugin to nifi-nar-bundles to properly set build info
in MANIFEST of NARs - Refactoring NarDetails to include al
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
index f770f5e..557bd62 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
@@ -16,39 +16,34 @@
*/
package org.apache.nifi.controller;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.commons.io.IOUtils;
import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
import org.apache.nifi.authorization.AccessPolicy;
import org.apache.nifi.authorization.Group;
import org.apache.nifi.authorization.MockPolicyBasedAuthorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.User;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.mock.DummyProcessor;
+import org.apache.nifi.controller.service.mock.DummyReportingTask;
+import org.apache.nifi.controller.service.mock.ServiceA;
+import org.apache.nifi.controller.service.mock.ServiceB;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.LogRepositoryFactory;
+import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.MockProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
@@ -56,21 +51,53 @@ import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FileBasedVariableRegistry;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.PositionDTO;
+import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class TestFlowController {
private FlowController controller;
private AbstractPolicyBasedAuthorizer authorizer;
- private StandardFlowSynchronizer standardFlowSynchronizer;
private FlowFileEventRepository flowFileEventRepo;
private AuditService auditService;
private StringEncryptor encryptor;
private NiFiProperties nifiProperties;
+ private Bundle systemBundle;
private BulletinRepository bulletinRepo;
private VariableRegistry variableRegistry;
@@ -87,6 +114,10 @@ public class TestFlowController {
nifiProperties = NiFiProperties.createBasicNiFiProperties(null, otherProps);
encryptor = StringEncryptor.createEncryptor(nifiProperties);
+ // use the system bundle
+ systemBundle = ExtensionManager.createSystemBundle(nifiProperties);
+ ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet());
+
User user1 = new User.Builder().identifier("user-id-1").identity("user-1").build();
User user2 = new User.Builder().identifier("user-id-2").identity("user-2").build();
@@ -128,8 +159,6 @@ public class TestFlowController {
bulletinRepo = Mockito.mock(BulletinRepository.class);
controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry);
-
- standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties);
}
@After
@@ -139,6 +168,8 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWithReportingTaskAndProcessorReferencingControllerService() throws IOException {
+ final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties);
+
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
@@ -200,6 +231,8 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWithProcessorReferencingControllerService() throws IOException {
+ final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties);
+
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
@@ -237,6 +270,8 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenAuthorizationsAreEqual() {
+ final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties);
+
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
@@ -249,6 +284,8 @@ public class TestFlowController {
@Test(expected = UninheritableFlowException.class)
public void testSynchronizeFlowWhenAuthorizationsAreDifferent() {
+ final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties);
+
// create a mock proposed data flow with different auth fingerprint as the current authorizer
final String authFingerprint = "<authorizations></authorizations>";
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
@@ -260,6 +297,8 @@ public class TestFlowController {
@Test(expected = UninheritableFlowException.class)
public void testSynchronizeFlowWhenProposedAuthorizationsAreNull() {
+ final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties);
+
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(null);
@@ -268,6 +307,8 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenCurrentAuthorizationsAreEmptyAndProposedAreNot() {
+ final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties);
+
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
@@ -283,8 +324,115 @@ public class TestFlowController {
}
@Test
+ public void testSynchronizeFlowWhenProposedMissingComponentsAreDifferent() {
+ final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties);
+
+ final Set<String> missingComponents = new HashSet<>();
+ missingComponents.add("1");
+ missingComponents.add("2");
+
+ final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ when(proposedDataFlow.getMissingComponents()).thenReturn(missingComponents);
+
+ try {
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow);
+ Assert.fail("Should have thrown exception");
+ } catch (UninheritableFlowException e) {
+ assertTrue(e.getMessage().contains("Proposed flow has missing components that are not considered missing in the current flow (1,2)"));
+ }
+ }
+
+ @Test
+ public void testSynchronizeFlowWhenExistingMissingComponentsAreDifferent() throws IOException {
+ final StringEncryptor stringEncryptor = StringEncryptor.createEncryptor(nifiProperties);
+ final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(stringEncryptor, nifiProperties);
+
+ final ProcessorNode mockProcessorNode = mock(ProcessorNode.class);
+ when(mockProcessorNode.getIdentifier()).thenReturn("1");
+ when(mockProcessorNode.isExtensionMissing()).thenReturn(true);
+
+ final ControllerServiceNode mockControllerServiceNode = mock(ControllerServiceNode.class);
+ when(mockControllerServiceNode.getIdentifier()).thenReturn("2");
+ when(mockControllerServiceNode.isExtensionMissing()).thenReturn(true);
+
+ final ReportingTaskNode mockReportingTaskNode = mock(ReportingTaskNode.class);
+ when(mockReportingTaskNode.getIdentifier()).thenReturn("3");
+ when(mockReportingTaskNode.isExtensionMissing()).thenReturn(true);
+
+ final ProcessGroup mockRootGroup = mock(ProcessGroup.class);
+ when(mockRootGroup.findAllProcessors()).thenReturn(Collections.singletonList(mockProcessorNode));
+
+ final SnippetManager mockSnippetManager = mock(SnippetManager.class);
+ when(mockSnippetManager.export()).thenReturn(new byte[0]);
+
+ final FlowController mockFlowController = mock(FlowController.class);
+ when(mockFlowController.getRootGroup()).thenReturn(mockRootGroup);
+ when(mockFlowController.getAllControllerServices()).thenReturn(new HashSet<>(Arrays.asList(mockControllerServiceNode)));
+ when(mockFlowController.getAllReportingTasks()).thenReturn(new HashSet<>(Arrays.asList(mockReportingTaskNode)));
+ when(mockFlowController.getAuthorizer()).thenReturn(authorizer);
+ when(mockFlowController.getSnippetManager()).thenReturn(mockSnippetManager);
+
+ final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ when(proposedDataFlow.getMissingComponents()).thenReturn(new HashSet<>());
+
+ try {
+ standardFlowSynchronizer.sync(mockFlowController, proposedDataFlow, stringEncryptor);
+ Assert.fail("Should have thrown exception");
+ } catch (UninheritableFlowException e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("Current flow has missing components that are not considered missing in the proposed flow (1,2,3)"));
+ }
+ }
+
+ @Test
+ public void testSynchronizeFlowWhenBundlesAreSame() throws IOException {
+ final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties);
+
+ final LogRepository logRepository = LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df");
+ logRepository.removeAllObservers();
+
+ syncFlow("src/test/resources/nifi/fingerprint/flow4.xml", standardFlowSynchronizer);
+ syncFlow("src/test/resources/nifi/fingerprint/flow4.xml", standardFlowSynchronizer);
+ }
+
+ @Test
+ public void testSynchronizeFlowWhenBundlesAreDifferent() throws IOException {
+ final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties);
+
+ final LogRepository logRepository = LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df");
+ logRepository.removeAllObservers();
+
+ // first sync should work because we are syncing to an empty flow controller
+ syncFlow("src/test/resources/nifi/fingerprint/flow4.xml", standardFlowSynchronizer);
+
+ // second sync should fail because the bundle of the processor is different
+ try {
+ syncFlow("src/test/resources/nifi/fingerprint/flow4-with-different-bundle.xml", standardFlowSynchronizer);
+ Assert.fail("Should have thrown UninheritableFlowException");
+ } catch (UninheritableFlowException e) {
+ //e.printStackTrace();
+ }
+ }
+
+ private void syncFlow(String flowXmlFile, FlowSynchronizer standardFlowSynchronizer) throws IOException {
+ String flowString = null;
+ try (final InputStream in = new FileInputStream(flowXmlFile)) {
+ flowString = IOUtils.toString(in, StandardCharsets.UTF_8);
+ }
+ assertNotNull(flowString);
+
+ final DataFlow proposedDataFlow1 = Mockito.mock(DataFlow.class);
+ when(proposedDataFlow1.getFlow()).thenReturn(flowString.getBytes(StandardCharsets.UTF_8));
+
+ final String authFingerprint = authorizer.getFingerprint();
+ when(proposedDataFlow1.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
+
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow1);
+ }
+
+ @Test
public void testCreateMissingProcessor() throws ProcessorInstantiationException {
- final ProcessorNode procNode = controller.createProcessor("org.apache.nifi.NonExistingProcessor", "1234-Processor");
+ final ProcessorNode procNode = controller.createProcessor("org.apache.nifi.NonExistingProcessor", "1234-Processor",
+ systemBundle.getBundleDetails().getCoordinate());
assertNotNull(procNode);
assertEquals("org.apache.nifi.NonExistingProcessor", procNode.getCanonicalClassName());
assertEquals("(Missing) NonExistingProcessor", procNode.getComponentType());
@@ -301,7 +449,8 @@ public class TestFlowController {
@Test
public void testCreateMissingReportingTask() throws ReportingTaskInstantiationException {
- final ReportingTaskNode taskNode = controller.createReportingTask("org.apache.nifi.NonExistingReportingTask", "1234-Reporting-Task", true);
+ final ReportingTaskNode taskNode = controller.createReportingTask("org.apache.nifi.NonExistingReportingTask", "1234-Reporting-Task",
+ systemBundle.getBundleDetails().getCoordinate(), true);
assertNotNull(taskNode);
assertEquals("org.apache.nifi.NonExistingReportingTask", taskNode.getCanonicalClassName());
assertEquals("(Missing) NonExistingReportingTask", taskNode.getComponentType());
@@ -315,7 +464,8 @@ public class TestFlowController {
@Test
public void testCreateMissingControllerService() throws ProcessorInstantiationException {
- final ControllerServiceNode serviceNode = controller.createControllerService("org.apache.nifi.NonExistingControllerService", "1234-Controller-Service", false);
+ final ControllerServiceNode serviceNode = controller.createControllerService("org.apache.nifi.NonExistingControllerService", "1234-Controller-Service",
+ systemBundle.getBundleDetails().getCoordinate(), false);
assertNotNull(serviceNode);
assertEquals("org.apache.nifi.NonExistingControllerService", serviceNode.getCanonicalClassName());
assertEquals("(Missing) NonExistingControllerService", serviceNode.getComponentType());
@@ -333,8 +483,9 @@ public class TestFlowController {
}
@Test
- public void testProcessorDefaultScheduleAnnotation() throws ProcessorInstantiationException {
- ProcessorNode p_scheduled = controller.createProcessor(DummyScheduledProcessor.class.getName(),"1234-ScheduledProcessor");
+ public void testProcessorDefaultScheduleAnnotation() throws ProcessorInstantiationException,ClassNotFoundException,InstantiationException,IllegalAccessException {
+ ProcessorNode p_scheduled = controller.createProcessor(DummyScheduledProcessor.class.getName(),"1234-ScheduledProcessor",
+ systemBundle.getBundleDetails().getCoordinate());
assertEquals(5,p_scheduled.getMaxConcurrentTasks());
assertEquals(SchedulingStrategy.CRON_DRIVEN,p_scheduled.getSchedulingStrategy());
assertEquals("0 0 0 1/1 * ?",p_scheduled.getSchedulingPeriod());
@@ -345,14 +496,15 @@ public class TestFlowController {
@Test
public void testReportingTaskDefaultScheduleAnnotation() throws ReportingTaskInstantiationException {
- ReportingTaskNode p_scheduled = controller.createReportingTask(DummyScheduledReportingTask.class.getName());
+ ReportingTaskNode p_scheduled = controller.createReportingTask(DummyScheduledReportingTask.class.getName(), systemBundle.getBundleDetails().getCoordinate());
assertEquals(SchedulingStrategy.CRON_DRIVEN,p_scheduled.getSchedulingStrategy());
assertEquals("0 0 0 1/1 * ?",p_scheduled.getSchedulingPeriod());
}
@Test
- public void testProcessorDefaultSettingsAnnotation() throws ProcessorInstantiationException {
- ProcessorNode p_settings = controller.createProcessor(DummySettingsProcessor.class.getName(),"1234-SettingsProcessor");
+ public void testProcessorDefaultSettingsAnnotation() throws ProcessorInstantiationException,ClassNotFoundException {
+
+ ProcessorNode p_settings = controller.createProcessor(DummySettingsProcessor.class.getName(),"1234-SettingsProcessor", systemBundle.getBundleDetails().getCoordinate());
assertEquals("5 sec",p_settings.getYieldPeriod());
assertEquals("1 min",p_settings.getPenalizationPeriod());
assertEquals(LogLevel.DEBUG,p_settings.getBulletinLevel());
@@ -365,7 +517,8 @@ public class TestFlowController {
public void testDeleteProcessGroup() {
ProcessGroup pg = controller.createProcessGroup("my-process-group");
pg.setName("my-process-group");
- ControllerServiceNode cs = controller.createControllerService("org.apache.nifi.NonExistingControllerService", "my-controller-service", false);
+ ControllerServiceNode cs = controller.createControllerService("org.apache.nifi.NonExistingControllerService", "my-controller-service",
+ systemBundle.getBundleDetails().getCoordinate(), false);
pg.addControllerService(cs);
controller.getRootGroup().addProcessGroup(pg);
controller.getRootGroup().removeProcessGroup(pg);
@@ -373,4 +526,281 @@ public class TestFlowController {
assertTrue(pg.getControllerServices(true).isEmpty());
}
+ @Test
+ public void testChangeProcessorType() throws ProcessorInstantiationException {
+ final String id = "1234-ScheduledProcessor" + System.currentTimeMillis();
+ final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
+ final ProcessorNode processorNode = controller.createProcessor(DummyScheduledProcessor.class.getName(), id, coordinate);
+ final String originalName = processorNode.getName();
+
+ assertEquals(id, processorNode.getIdentifier());
+ assertEquals(id, processorNode.getComponent().getIdentifier());
+ assertEquals(coordinate.getCoordinate(), processorNode.getBundleCoordinate().getCoordinate());
+ assertEquals(DummyScheduledProcessor.class.getCanonicalName(), processorNode.getCanonicalClassName());
+ assertEquals(DummyScheduledProcessor.class.getSimpleName(), processorNode.getComponentType());
+ assertEquals(DummyScheduledProcessor.class.getCanonicalName(), processorNode.getComponent().getClass().getCanonicalName());
+
+ assertEquals(5, processorNode.getMaxConcurrentTasks());
+ assertEquals(SchedulingStrategy.CRON_DRIVEN, processorNode.getSchedulingStrategy());
+ assertEquals("0 0 0 1/1 * ?",processorNode.getSchedulingPeriod());
+ assertEquals("1 sec", processorNode.getYieldPeriod());
+ assertEquals("30 sec", processorNode.getPenalizationPeriod());
+ assertEquals(LogLevel.WARN, processorNode.getBulletinLevel());
+
+ // now change the type of the processor from DummyScheduledProcessor to DummySettingsProcessor
+ controller.changeProcessorType(processorNode, DummySettingsProcessor.class.getName(), coordinate);
+
+ // ids and coordinate should stay the same
+ assertEquals(id, processorNode.getIdentifier());
+ assertEquals(id, processorNode.getComponent().getIdentifier());
+ assertEquals(coordinate.getCoordinate(), processorNode.getBundleCoordinate().getCoordinate());
+
+ // in this test we happened to change between two processors that have different canonical class names
+ // but in the running application the DAO layer would call verifyCanUpdateBundle and would prevent this so
+ // for the sake of this test it is ok that the canonical class name hasn't changed
+ assertEquals(originalName, processorNode.getName());
+ assertEquals(DummyScheduledProcessor.class.getCanonicalName(), processorNode.getCanonicalClassName());
+ assertEquals(DummyScheduledProcessor.class.getSimpleName(), processorNode.getComponentType());
+ assertEquals(DummySettingsProcessor.class.getCanonicalName(), processorNode.getComponent().getClass().getCanonicalName());
+
+ // all these settings should have stayed the same
+ assertEquals(5, processorNode.getMaxConcurrentTasks());
+ assertEquals(SchedulingStrategy.CRON_DRIVEN, processorNode.getSchedulingStrategy());
+ assertEquals("0 0 0 1/1 * ?", processorNode.getSchedulingPeriod());
+ assertEquals("1 sec", processorNode.getYieldPeriod());
+ assertEquals("30 sec", processorNode.getPenalizationPeriod());
+ assertEquals(LogLevel.WARN, processorNode.getBulletinLevel());
+ }
+
+ @Test
+ public void testChangeControllerServiceType() {
+ final String id = "ServiceA" + System.currentTimeMillis();
+ final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
+ final ControllerServiceNode controllerServiceNode = controller.createControllerService(ServiceA.class.getName(), id, coordinate, true);
+ final String originalName = controllerServiceNode.getName();
+
+ assertEquals(id, controllerServiceNode.getIdentifier());
+ assertEquals(id, controllerServiceNode.getComponent().getIdentifier());
+ assertEquals(coordinate.getCoordinate(), controllerServiceNode.getBundleCoordinate().getCoordinate());
+ assertEquals(ServiceA.class.getCanonicalName(), controllerServiceNode.getCanonicalClassName());
+ assertEquals(ServiceA.class.getSimpleName(), controllerServiceNode.getComponentType());
+ assertEquals(ServiceA.class.getCanonicalName(), controllerServiceNode.getComponent().getClass().getCanonicalName());
+
+ controller.changeControllerServiceType(controllerServiceNode, ServiceB.class.getName(), coordinate);
+
+ // ids and coordinate should stay the same
+ assertEquals(id, controllerServiceNode.getIdentifier());
+ assertEquals(id, controllerServiceNode.getComponent().getIdentifier());
+ assertEquals(coordinate.getCoordinate(), controllerServiceNode.getBundleCoordinate().getCoordinate());
+
+ // in this test we happened to change between two services that have different canonical class names
+ // but in the running application the DAO layer would call verifyCanUpdateBundle and would prevent this so
+ // for the sake of this test it is ok that the canonical class name hasn't changed
+ assertEquals(originalName, controllerServiceNode.getName());
+ assertEquals(ServiceA.class.getCanonicalName(), controllerServiceNode.getCanonicalClassName());
+ assertEquals(ServiceA.class.getSimpleName(), controllerServiceNode.getComponentType());
+ assertEquals(ServiceB.class.getCanonicalName(), controllerServiceNode.getComponent().getClass().getCanonicalName());
+ }
+
+ @Test
+ public void testChangeReportingTaskType() throws ReportingTaskInstantiationException {
+ final String id = "ReportingTask" + System.currentTimeMillis();
+ final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
+ final ReportingTaskNode node = controller.createReportingTask(DummyReportingTask.class.getName(), id, coordinate, true);
+ final String originalName = node.getName();
+
+ assertEquals(id, node.getIdentifier());
+ assertEquals(id, node.getComponent().getIdentifier());
+ assertEquals(coordinate.getCoordinate(), node.getBundleCoordinate().getCoordinate());
+ assertEquals(DummyReportingTask.class.getCanonicalName(), node.getCanonicalClassName());
+ assertEquals(DummyReportingTask.class.getSimpleName(), node.getComponentType());
+ assertEquals(DummyReportingTask.class.getCanonicalName(), node.getComponent().getClass().getCanonicalName());
+
+ controller.changeReportingTaskType(node, DummyScheduledReportingTask.class.getName(), coordinate);
+
+ // ids and coordinate should stay the same
+ assertEquals(id, node.getIdentifier());
+ assertEquals(id, node.getComponent().getIdentifier());
+ assertEquals(coordinate.getCoordinate(), node.getBundleCoordinate().getCoordinate());
+
+ // in this test we happened to change between two services that have different canonical class names
+ // but in the running application the DAO layer would call verifyCanUpdateBundle and would prevent this so
+ // for the sake of this test it is ok that the canonical class name hasn't changed
+ assertEquals(originalName, node.getName());
+ assertEquals(DummyReportingTask.class.getCanonicalName(), node.getCanonicalClassName());
+ assertEquals(DummyReportingTask.class.getSimpleName(), node.getComponentType());
+ assertEquals(DummyScheduledReportingTask.class.getCanonicalName(), node.getComponent().getClass().getCanonicalName());
+
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInstantiateSnippetWhenProcessorMissingBundle() throws Exception {
+ final String id = UUID.randomUUID().toString();
+ final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
+ final ProcessorNode processorNode = controller.createProcessor(DummyProcessor.class.getName(), id, coordinate);
+
+ // create a processor dto
+ final ProcessorDTO processorDTO = new ProcessorDTO();
+ processorDTO.setId(UUID.randomUUID().toString()); // use a different id here
+ processorDTO.setPosition(new PositionDTO(new Double(0), new Double(0)));
+ processorDTO.setStyle(processorNode.getStyle());
+ processorDTO.setParentGroupId("1234");
+ processorDTO.setInputRequirement(processorNode.getInputRequirement().name());
+ processorDTO.setPersistsState(processorNode.getProcessor().getClass().isAnnotationPresent(Stateful.class));
+ processorDTO.setRestricted(processorNode.isRestricted());
+ processorDTO.setExtensionMissing(processorNode.isExtensionMissing());
+
+ processorDTO.setType(processorNode.getCanonicalClassName());
+ processorDTO.setBundle(null); // missing bundle
+ processorDTO.setName(processorNode.getName());
+ processorDTO.setState(processorNode.getScheduledState().toString());
+
+ processorDTO.setRelationships(new ArrayList<>());
+
+ processorDTO.setDescription("description");
+ processorDTO.setSupportsParallelProcessing(!processorNode.isTriggeredSerially());
+ processorDTO.setSupportsEventDriven(processorNode.isEventDrivenSupported());
+ processorDTO.setSupportsBatching(processorNode.isHighThroughputSupported());
+
+ ProcessorConfigDTO configDTO = new ProcessorConfigDTO();
+ configDTO.setSchedulingPeriod(processorNode.getSchedulingPeriod());
+ configDTO.setPenaltyDuration(processorNode.getPenalizationPeriod());
+ configDTO.setYieldDuration(processorNode.getYieldPeriod());
+ configDTO.setRunDurationMillis(processorNode.getRunDuration(TimeUnit.MILLISECONDS));
+ configDTO.setConcurrentlySchedulableTaskCount(processorNode.getMaxConcurrentTasks());
+ configDTO.setLossTolerant(processorNode.isLossTolerant());
+ configDTO.setComments(processorNode.getComments());
+ configDTO.setBulletinLevel(processorNode.getBulletinLevel().name());
+ configDTO.setSchedulingStrategy(processorNode.getSchedulingStrategy().name());
+ configDTO.setExecutionNode(processorNode.getExecutionNode().name());
+ configDTO.setAnnotationData(processorNode.getAnnotationData());
+
+ processorDTO.setConfig(configDTO);
+
+ // create the snippet with the processor
+ final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO();
+ flowSnippetDTO.setProcessors(Collections.singleton(processorDTO));
+
+ // instantiate the snippet
+ assertEquals(0, controller.getRootGroup().getProcessors().size());
+ controller.instantiateSnippet(controller.getRootGroup(), flowSnippetDTO);
+ }
+
+ @Test
+ public void testInstantiateSnippetWithProcessor() throws ProcessorInstantiationException {
+ final String id = UUID.randomUUID().toString();
+ final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
+ final ProcessorNode processorNode = controller.createProcessor(DummyProcessor.class.getName(), id, coordinate);
+
+ // create a processor dto
+ final ProcessorDTO processorDTO = new ProcessorDTO();
+ processorDTO.setId(UUID.randomUUID().toString()); // use a different id here
+ processorDTO.setPosition(new PositionDTO(new Double(0), new Double(0)));
+ processorDTO.setStyle(processorNode.getStyle());
+ processorDTO.setParentGroupId("1234");
+ processorDTO.setInputRequirement(processorNode.getInputRequirement().name());
+ processorDTO.setPersistsState(processorNode.getProcessor().getClass().isAnnotationPresent(Stateful.class));
+ processorDTO.setRestricted(processorNode.isRestricted());
+ processorDTO.setExtensionMissing(processorNode.isExtensionMissing());
+
+ processorDTO.setType(processorNode.getCanonicalClassName());
+ processorDTO.setBundle(new BundleDTO(coordinate.getGroup(), coordinate.getId(), coordinate.getVersion()));
+ processorDTO.setName(processorNode.getName());
+ processorDTO.setState(processorNode.getScheduledState().toString());
+
+ processorDTO.setRelationships(new ArrayList<>());
+
+ processorDTO.setDescription("description");
+ processorDTO.setSupportsParallelProcessing(!processorNode.isTriggeredSerially());
+ processorDTO.setSupportsEventDriven(processorNode.isEventDrivenSupported());
+ processorDTO.setSupportsBatching(processorNode.isHighThroughputSupported());
+
+ ProcessorConfigDTO configDTO = new ProcessorConfigDTO();
+ configDTO.setSchedulingPeriod(processorNode.getSchedulingPeriod());
+ configDTO.setPenaltyDuration(processorNode.getPenalizationPeriod());
+ configDTO.setYieldDuration(processorNode.getYieldPeriod());
+ configDTO.setRunDurationMillis(processorNode.getRunDuration(TimeUnit.MILLISECONDS));
+ configDTO.setConcurrentlySchedulableTaskCount(processorNode.getMaxConcurrentTasks());
+ configDTO.setLossTolerant(processorNode.isLossTolerant());
+ configDTO.setComments(processorNode.getComments());
+ configDTO.setBulletinLevel(processorNode.getBulletinLevel().name());
+ configDTO.setSchedulingStrategy(processorNode.getSchedulingStrategy().name());
+ configDTO.setExecutionNode(processorNode.getExecutionNode().name());
+ configDTO.setAnnotationData(processorNode.getAnnotationData());
+
+ processorDTO.setConfig(configDTO);
+
+ // create the snippet with the processor
+ final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO();
+ flowSnippetDTO.setProcessors(Collections.singleton(processorDTO));
+
+ // instantiate the snippet
+ assertEquals(0, controller.getRootGroup().getProcessors().size());
+ controller.instantiateSnippet(controller.getRootGroup(), flowSnippetDTO);
+ assertEquals(1, controller.getRootGroup().getProcessors().size());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInstantiateSnippetWhenControllerServiceMissingBundle() throws ProcessorInstantiationException {
+ final String id = UUID.randomUUID().toString();
+ final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
+ final ControllerServiceNode controllerServiceNode = controller.createControllerService(ServiceA.class.getName(), id, coordinate, true);
+
+ // create the controller service dto
+ final ControllerServiceDTO csDto = new ControllerServiceDTO();
+ csDto.setId(UUID.randomUUID().toString()); // use a different id
+ csDto.setParentGroupId(controllerServiceNode.getProcessGroup() == null ? null : controllerServiceNode.getProcessGroup().getIdentifier());
+ csDto.setName(controllerServiceNode.getName());
+ csDto.setType(controllerServiceNode.getCanonicalClassName());
+ csDto.setBundle(null); // missing bundle
+ csDto.setState(controllerServiceNode.getState().name());
+ csDto.setAnnotationData(controllerServiceNode.getAnnotationData());
+ csDto.setComments(controllerServiceNode.getComments());
+ csDto.setPersistsState(controllerServiceNode.getControllerServiceImplementation().getClass().isAnnotationPresent(Stateful.class));
+ csDto.setRestricted(controllerServiceNode.isRestricted());
+ csDto.setExtensionMissing(controllerServiceNode.isExtensionMissing());
+ csDto.setDescriptors(new LinkedHashMap<>());
+ csDto.setProperties(new LinkedHashMap<>());
+
+ // create the snippet with the controller service
+ final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO();
+ flowSnippetDTO.setControllerServices(Collections.singleton(csDto));
+
+ // instantiate the snippet
+ assertEquals(0, controller.getRootGroup().getControllerServices(false).size());
+ controller.instantiateSnippet(controller.getRootGroup(), flowSnippetDTO);
+ }
+
+ @Test
+ public void testInstantiateSnippetWithControllerService() throws ProcessorInstantiationException {
+ final String id = UUID.randomUUID().toString();
+ final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
+ final ControllerServiceNode controllerServiceNode = controller.createControllerService(ServiceA.class.getName(), id, coordinate, true);
+
+ // create the controller service dto
+ final ControllerServiceDTO csDto = new ControllerServiceDTO();
+ csDto.setId(UUID.randomUUID().toString()); // use a different id
+ csDto.setParentGroupId(controllerServiceNode.getProcessGroup() == null ? null : controllerServiceNode.getProcessGroup().getIdentifier());
+ csDto.setName(controllerServiceNode.getName());
+ csDto.setType(controllerServiceNode.getCanonicalClassName());
+ csDto.setBundle(new BundleDTO(coordinate.getGroup(), coordinate.getId(), coordinate.getVersion()));
+ csDto.setState(controllerServiceNode.getState().name());
+ csDto.setAnnotationData(controllerServiceNode.getAnnotationData());
+ csDto.setComments(controllerServiceNode.getComments());
+ csDto.setPersistsState(controllerServiceNode.getControllerServiceImplementation().getClass().isAnnotationPresent(Stateful.class));
+ csDto.setRestricted(controllerServiceNode.isRestricted());
+ csDto.setExtensionMissing(controllerServiceNode.isExtensionMissing());
+ csDto.setDescriptors(new LinkedHashMap<>());
+ csDto.setProperties(new LinkedHashMap<>());
+
+ // create the snippet with the controller service
+ final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO();
+ flowSnippetDTO.setControllerServices(Collections.singleton(csDto));
+
+ // instantiate the snippet
+ assertEquals(0, controller.getRootGroup().getControllerServices(false).size());
+ controller.instantiateSnippet(controller.getRootGroup(), flowSnippetDTO);
+ assertEquals(1, controller.getRootGroup().getControllerServices(false).size());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
index e9623e3..a86b7b3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
@@ -20,6 +20,8 @@ package org.apache.nifi.controller;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
@@ -57,7 +59,6 @@ import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -81,15 +82,17 @@ public class TestStandardProcessorNode {
@Test(timeout = 10000)
public void testStart() throws InterruptedException {
- System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessorNode.class.getResource("/conf/nifi.properties").getFile());
final ProcessorThatThrowsExceptionOnScheduled processor = new ProcessorThatThrowsExceptionOnScheduled();
final String uuid = UUID.randomUUID().toString();
ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(uuid, null, null, null, null);
processor.initialize(initContext);
- final StandardProcessorNode procNode = new StandardProcessorNode(processor, uuid, createValidationContextFactory(), null, null,
- NiFiProperties.createBasicNiFiProperties(null, null), VariableRegistry.EMPTY_REGISTRY, Mockito.mock(ComponentLog.class));
+ final BundleCoordinate coordinate = Mockito.mock(BundleCoordinate.class);
+
+ final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(processor, coordinate, null);
+ final StandardProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid, createValidationContextFactory(), null, null,
+ NiFiProperties.createBasicNiFiProperties(null, null), VariableRegistry.EMPTY_REGISTRY);
final ScheduledExecutorService taskScheduler = new FlowEngine(2, "TestClasspathResources", true);
final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, null);
@@ -140,12 +143,6 @@ public class TestStandardProcessorNode {
final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp));
final StandardProcessorNode procNode = createProcessorNode(processor);
- final Set<ClassLoader> classLoaders = new HashSet<>();
- classLoaders.add(procNode.getProcessor().getClass().getClassLoader());
-
- // Load all of the extensions in src/test/java of this project
- ExtensionManager.discoverExtensions(classLoaders);
-
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){
// Should have an InstanceClassLoader here
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
@@ -189,12 +186,6 @@ public class TestStandardProcessorNode {
final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp, otherProp));
final StandardProcessorNode procNode = createProcessorNode(processor);
- final Set<ClassLoader> classLoaders = new HashSet<>();
- classLoaders.add(procNode.getProcessor().getClass().getClassLoader());
-
- // Load all of the extensions in src/test/java of this project
- ExtensionManager.discoverExtensions(classLoaders);
-
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){
// Should have an InstanceClassLoader here
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
@@ -263,12 +254,6 @@ public class TestStandardProcessorNode {
final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp1, classpathProp2));
final StandardProcessorNode procNode = createProcessorNode(processor);
- final Set<ClassLoader> classLoaders = new HashSet<>();
- classLoaders.add(procNode.getProcessor().getClass().getClassLoader());
-
- // Load all of the extensions in src/test/java of this project
- ExtensionManager.discoverExtensions(classLoaders);
-
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){
// Should have an InstanceClassLoader here
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
@@ -315,12 +300,6 @@ public class TestStandardProcessorNode {
final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp1, classpathProp2));
final StandardProcessorNode procNode = createProcessorNode(processor);
- final Set<ClassLoader> classLoaders = new HashSet<>();
- classLoaders.add(procNode.getProcessor().getClass().getClassLoader());
-
- // Load all of the extensions in src/test/java of this project
- ExtensionManager.discoverExtensions(classLoaders);
-
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){
// Should have an InstanceClassLoader here
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
@@ -359,12 +338,6 @@ public class TestStandardProcessorNode {
final ModifiesClasspathNoAnnotationProcessor processor = new ModifiesClasspathNoAnnotationProcessor();
final StandardProcessorNode procNode = createProcessorNode(processor);
- final Set<ClassLoader> classLoaders = new HashSet<>();
- classLoaders.add(procNode.getProcessor().getClass().getClassLoader());
-
- // Load all of the extensions in src/test/java of this project
- ExtensionManager.discoverExtensions(classLoaders);
-
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){
// Can't validate the ClassLoader here b/c the class is missing the annotation
@@ -392,18 +365,62 @@ public class TestStandardProcessorNode {
}
}
+ @Test
+ public void testVerifyCanUpdateBundle() {
+ final ModifiesClasspathNoAnnotationProcessor processor = new ModifiesClasspathNoAnnotationProcessor();
+ final StandardProcessorNode procNode = createProcessorNode(processor);
+ final BundleCoordinate existingCoordinate = procNode.getBundleCoordinate();
+
+ // should be allowed to update when the bundle is the same
+ procNode.verifyCanUpdateBundle(existingCoordinate);
+
+ // should be allowed to update when the group and id are the same but version is different
+ final BundleCoordinate diffVersion = new BundleCoordinate(existingCoordinate.getGroup(), existingCoordinate.getId(), "v2");
+ assertTrue(!existingCoordinate.getVersion().equals(diffVersion.getVersion()));
+ procNode.verifyCanUpdateBundle(diffVersion);
+
+ // should not be allowed to update when the bundle id is different
+ final BundleCoordinate diffId = new BundleCoordinate(existingCoordinate.getGroup(), "different-id", existingCoordinate.getVersion());
+ assertTrue(!existingCoordinate.getId().equals(diffId.getId()));
+ try {
+ procNode.verifyCanUpdateBundle(diffId);
+ Assert.fail("Should have thrown exception");
+ } catch (Exception e) {
+
+ }
+
+ // should not be allowed to update when the bundle group is different
+ final BundleCoordinate diffGroup = new BundleCoordinate("different-group", existingCoordinate.getId(), existingCoordinate.getVersion());
+ assertTrue(!existingCoordinate.getGroup().equals(diffGroup.getGroup()));
+ try {
+ procNode.verifyCanUpdateBundle(diffGroup);
+ Assert.fail("Should have thrown exception");
+ } catch (Exception e) {
+
+ }
+ }
+
+ @Test
+ public void testValidateControllerServiceApiRequired() {
+
+ }
+
private StandardProcessorNode createProcessorNode(Processor processor) {
final String uuid = UUID.randomUUID().toString();
final ValidationContextFactory validationContextFactory = createValidationContextFactory();
- final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, null);
+ final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", null);
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
final ComponentLog componentLog = Mockito.mock(ComponentLog.class);
+ final Bundle systemBundle = ExtensionManager.createSystemBundle(niFiProperties);
+ ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet());
+ ExtensionManager.createInstanceClassLoader(processor.getClass().getName(), uuid, systemBundle);
+
ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(uuid, componentLog, null, null, null);
processor.initialize(initContext);
- return new StandardProcessorNode(processor, uuid, validationContextFactory, processScheduler, null,
- niFiProperties, variableRegistry, componentLog);
+ final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(processor, systemBundle.getBundleDetails().getCoordinate(), componentLog);
+ return new StandardProcessorNode(loggableComponent, uuid, validationContextFactory, processScheduler, null, niFiProperties, variableRegistry);
}
private boolean containsResource(URL[] resources, URL resourceToFind) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
index 46d96be..5c8d447 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
@@ -16,35 +16,13 @@
*/
package org.apache.nifi.controller.scheduling;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.io.File;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.LockSupport;
-
import org.apache.commons.io.FileUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -60,6 +38,7 @@ import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -75,6 +54,29 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
/**
* Validate Processor's life-cycle operation within the context of
* {@link FlowController} and {@link StandardProcessScheduler}
@@ -100,11 +102,12 @@ public class TestProcessorLifecycle {
@Test
public void validateEnableOperation() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
- UUID.randomUUID().toString());
+ UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState());
@@ -121,11 +124,13 @@ public class TestProcessorLifecycle {
@Test
public void validateDisableOperation() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
+
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
- UUID.randomUUID().toString());
+ UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState());
@@ -147,10 +152,13 @@ public class TestProcessorLifecycle {
*/
@Test
public void validateIdempotencyOfProcessorStartOperation() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
+
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -173,10 +181,12 @@ public class TestProcessorLifecycle {
*/
@Test
public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
@@ -196,10 +206,12 @@ public class TestProcessorLifecycle {
@Test
@Ignore
public void validateSuccessfullAndOrderlyShutdown() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -239,10 +251,13 @@ public class TestProcessorLifecycle {
@Test
@Ignore
public void validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
+
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -295,10 +310,13 @@ public class TestProcessorLifecycle {
*/
@Test
public void validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
+
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -331,10 +349,13 @@ public class TestProcessorLifecycle {
*/
@Test
public void validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
+
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -363,10 +384,13 @@ public class TestProcessorLifecycle {
*/
@Test
public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
+
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -394,10 +418,13 @@ public class TestProcessorLifecycle {
*/
@Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception {
- this.fc = buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
+ fc = fcsb.getFlowController();
+
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
@@ -422,10 +449,13 @@ public class TestProcessorLifecycle {
*/
@Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception {
- this.fc = buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
+ fc = fcsb.getFlowController();
+
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
@@ -455,10 +485,13 @@ public class TestProcessorLifecycle {
*/
@Test
public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
+
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
@@ -484,10 +517,13 @@ public class TestProcessorLifecycle {
*/
@Test(expected = IllegalStateException.class)
public void validateStartFailsOnInvalidProcessorWithMissingProperty() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
+
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
ProcessScheduler ps = fc.getProcessScheduler();
ps.startProcessor(testProcNode);
fail();
@@ -499,12 +535,16 @@ public class TestProcessorLifecycle {
*/
@Test(expected = IllegalStateException.class)
public void validateStartFailsOnInvalidProcessorWithDisabledService() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
+
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "serv", true);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "serv",
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate(), true);
+ ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
properties.put("S", testServiceNode.getIdentifier());
testProcNode.setProperties(properties);
@@ -522,14 +562,18 @@ public class TestProcessorLifecycle {
*/
@Test
public void validateStartSucceedsOnProcessorWithEnabledService() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
+
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "foo", true);
+ ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "foo",
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate(), true);
testGroup.addControllerService(testServiceNode);
- ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testGroup.addProcessor(testProcNode);
properties.put("S", testServiceNode.getIdentifier());
@@ -554,15 +598,19 @@ public class TestProcessorLifecycle {
*/
@Test
public void validateProcessorDeletion() throws Exception {
- fc = this.buildFlowControllerForTest();
+ final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
+ fc = fcsb.getFlowController();
+
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
- ProcessorNode testProcNodeA = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ ProcessorNode testProcNodeA = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNodeA.setProperties(properties);
testGroup.addProcessor(testProcNodeA);
- ProcessorNode testProcNodeB = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
+ ProcessorNode testProcNodeB = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+ fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNodeB.setProperties(properties);
testGroup.addProcessor(testProcNodeB);
@@ -670,7 +718,7 @@ public class TestProcessorLifecycle {
testProcessor.setScenario(emptyRunnable, emptyRunnable, emptyRunnable, emptyRunnable);
}
- private FlowController buildFlowControllerForTest(final String propKey, final String propValue) throws Exception {
+ private FlowControllerAndSystemBundle buildFlowControllerForTest(final String propKey, final String propValue) throws Exception {
final Map<String, String> addProps = new HashMap<>();
addProps.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec");
addProps.put(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml");
@@ -682,12 +730,18 @@ public class TestProcessorLifecycle {
addProps.put(propKey, propValue);
}
final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(null, addProps);
- return FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties,
+
+ final Bundle systemBundle = ExtensionManager.createSystemBundle(nifiProperties);
+ ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet());
+
+ final FlowController flowController = FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties,
mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(),
new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()));
+
+ return new FlowControllerAndSystemBundle(flowController, systemBundle);
}
- private FlowController buildFlowControllerForTest() throws Exception {
+ private FlowControllerAndSystemBundle buildFlowControllerForTest() throws Exception {
return buildFlowControllerForTest(null, null);
}
@@ -705,6 +759,25 @@ public class TestProcessorLifecycle {
}
}
+ private static class FlowControllerAndSystemBundle {
+
+ private final FlowController flowController;
+ private final Bundle systemBundle;
+
+ public FlowControllerAndSystemBundle(FlowController flowController, Bundle systemBundle) {
+ this.flowController = flowController;
+ this.systemBundle = systemBundle;
+ }
+
+ public FlowController getFlowController() {
+ return flowController;
+ }
+
+ public Bundle getSystemBundle() {
+ return systemBundle;
+ }
+ }
+
/**
*/
public static class TestProcessor extends AbstractProcessor {
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index ee2b103..b69701e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -16,29 +16,16 @@
*/
package org.apache.nifi.controller.scheduling;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.LoggableComponent;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
@@ -55,6 +42,7 @@ import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.service.mock.MockProcessGroup;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -67,6 +55,7 @@ import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Before;
@@ -74,6 +63,23 @@ import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
public class TestStandardProcessScheduler {
private StandardProcessScheduler scheduler = null;
@@ -84,11 +90,17 @@ public class TestStandardProcessScheduler {
private FlowController controller;
private ProcessGroup rootGroup;
private NiFiProperties nifiProperties;
+ private Bundle systemBundle;
@Before
public void setup() throws InitializationException {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessScheduler.class.getResource("/nifi.properties").getFile());
this.nifiProperties = NiFiProperties.createBasicNiFiProperties(null, null);
+
+ // load the system bundle
+ systemBundle = ExtensionManager.createSystemBundle(nifiProperties);
+ ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet());
+
scheduler = new StandardProcessScheduler(Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider, variableRegistry, nifiProperties);
scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class));
@@ -99,7 +111,8 @@ public class TestStandardProcessScheduler {
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null, variableRegistry);
final ComponentLog logger = Mockito.mock(ComponentLog.class);
- taskNode = new StandardReportingTaskNode(reportingTask, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, variableRegistry, logger);
+ final LoggableComponent<ReportingTask> loggableComponent = new LoggableComponent<>(reportingTask, systemBundle.getBundleDetails().getCoordinate(), logger);
+ taskNode = new StandardReportingTaskNode(loggableComponent, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, variableRegistry);
controller = Mockito.mock(FlowController.class);
rootGroup = new MockProcessGroup();
@@ -139,13 +152,14 @@ public class TestStandardProcessScheduler {
final StandardControllerServiceProvider serviceProvider =
new StandardControllerServiceProvider(controller, scheduler, null, Mockito.mock(StateManagerProvider.class), variableRegistry, nifiProperties);
- final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", true);
+ final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service",
+ systemBundle.getBundleDetails().getCoordinate(), true);
rootGroup.addControllerService(service);
- final ProcessorNode procNode = new StandardProcessorNode(proc, uuid,
+ final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null);
+ final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid,
new StandardValidationContextFactory(serviceProvider, variableRegistry),
- scheduler, serviceProvider, nifiProperties, VariableRegistry.EMPTY_REGISTRY,
- Mockito.mock(ComponentLog.class));
+ scheduler, serviceProvider, nifiProperties, VariableRegistry.EMPTY_REGISTRY);
rootGroup.addProcessor(procNode);
Map<String,String> procProps = new HashMap<>();
@@ -219,7 +233,7 @@ public class TestStandardProcessScheduler {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
- "1", false);
+ "1", systemBundle.getBundleDetails().getCoordinate(), false);
assertFalse(serviceNode.isActive());
final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
final ExecutorService executor = Executors.newCachedThreadPool();
@@ -258,7 +272,7 @@ public class TestStandardProcessScheduler {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
- "1", false);
+ "1", systemBundle.getBundleDetails().getCoordinate(), false);
final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
final ExecutorService executor = Executors.newCachedThreadPool();
@@ -296,7 +310,7 @@ public class TestStandardProcessScheduler {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
- "1", false);
+ "1", systemBundle.getBundleDetails().getCoordinate(), false);
final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
scheduler.enableControllerService(serviceNode);
assertTrue(serviceNode.isActive());
@@ -330,7 +344,7 @@ public class TestStandardProcessScheduler {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(),
- "1", false);
+ "1", systemBundle.getBundleDetails().getCoordinate(), false);
scheduler.enableControllerService(serviceNode);
Thread.sleep(1000);
scheduler.shutdown();
@@ -363,8 +377,8 @@ public class TestStandardProcessScheduler {
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
final ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 200; i++) {
- final ControllerServiceNode serviceNode = provider
- .createControllerService(RandomShortDelayEnablingService.class.getName(), "1", false);
+ final ControllerServiceNode serviceNode = provider.createControllerService(RandomShortDelayEnablingService.class.getName(), "1",
+ systemBundle.getBundleDetails().getCoordinate(), false);
executor.execute(new Runnable() {
@Override
@@ -405,7 +419,7 @@ public class TestStandardProcessScheduler {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
- "1", false);
+ "1", systemBundle.getBundleDetails().getCoordinate(), false);
final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
ts.setLimit(Long.MAX_VALUE);
scheduler.enableControllerService(serviceNode);
@@ -431,7 +445,7 @@ public class TestStandardProcessScheduler {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
- "1", false);
+ "1", systemBundle.getBundleDetails().getCoordinate(), false);
final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
ts.setLimit(3000);
scheduler.enableControllerService(serviceNode);
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
index 0e4571a..15c35d9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller.service;
+import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.ControllerService;
@@ -30,19 +31,26 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
+
public class StandardControllerServiceProviderTest {
private ControllerService proxied;
private ControllerService implementation;
private static VariableRegistry variableRegistry;
private static NiFiProperties nifiProperties;
+ private static Bundle systemBundle;
@BeforeClass
public static void setupSuite() throws Exception {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, StandardControllerServiceProviderTest.class.getResource("/conf/nifi.properties").getFile());
nifiProperties = NiFiProperties.createBasicNiFiProperties(null, null);
+
NarClassLoaders.getInstance().init(nifiProperties.getFrameworkWorkingDirectory(), nifiProperties.getExtensionsWorkingDirectory());
- ExtensionManager.discoverExtensions(NarClassLoaders.getInstance().getExtensionClassLoaders());
+
+ // load the system bundle
+ systemBundle = ExtensionManager.createSystemBundle(nifiProperties);
+ ExtensionManager.discoverExtensions(systemBundle, NarClassLoaders.getInstance().getBundles());
+
variableRegistry = new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths());
}
@@ -72,7 +80,7 @@ public class StandardControllerServiceProviderTest {
public void onComponentRemoved(String componentId) {
}
}, variableRegistry, nifiProperties);
- ControllerServiceNode node = provider.createControllerService(clazz, id, true);
+ ControllerServiceNode node = provider.createControllerService(clazz, id, systemBundle.getBundleDetails().getCoordinate(), true);
proxied = node.getProxiedControllerService();
implementation = node.getControllerServiceImplementation();
}