You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/19 19:30:51 UTC
[1/5] incubator-nifi git commit: NIFI-250: Deleted dead code
Repository: incubator-nifi
Updated Branches:
refs/heads/NIFI-250 fac6cd7ac -> 7de30ab15
NIFI-250: Deleted dead code
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d30a1843
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d30a1843
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d30a1843
Branch: refs/heads/NIFI-250
Commit: d30a1843c24caccf822811649332dad7e1aa5b17
Parents: 371e010
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Feb 11 16:08:20 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Feb 11 16:08:20 2015 -0500
----------------------------------------------------------------------
.../nifi/controller/StandardFlowService.java | 7 +-
.../controller/StandardFlowSynchronizer.java | 2 +-
.../nifi/persistence/FlowConfigurationDAO.java | 13 --
.../StandardXMLFlowConfigurationDAO.java | 180 +------------------
4 files changed, 3 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d30a1843/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 790485c..f62c842 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -81,8 +81,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
private final FlowController controller;
private final Path flowXml;
- private final Path taskConfigXml;
- private final Path serviceConfigXml;
private final FlowConfigurationDAO dao;
private final int gracefulShutdownSeconds;
private final boolean autoResumeState;
@@ -154,14 +152,12 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
this.controller = controller;
this.encryptor = encryptor;
flowXml = Paths.get(properties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE));
- taskConfigXml = Paths.get(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
- serviceConfigXml = Paths.get(properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE));
gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS);
autoResumeState = properties.getAutoResumeState();
connectionRetryMillis = (int) FormatUtils.getTimeDuration(properties.getClusterManagerFlowRetrievalDelay(), TimeUnit.MILLISECONDS);
- dao = new StandardXMLFlowConfigurationDAO(flowXml, taskConfigXml, serviceConfigXml, encryptor);
+ dao = new StandardXMLFlowConfigurationDAO(flowXml, encryptor);
if (configuredForClustering) {
@@ -608,7 +604,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
if (firstControllerInitialization) {
// load the controller services
logger.debug("Loading controller services");
-// dao.loadControllerServices(controller);
}
// load the flow
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d30a1843/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index efeabad..2508b5b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -361,7 +361,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
if ( autoResumeState ) {
- if ( dto.getEnabled() == Boolean.TRUE ) {
+ if ( Boolean.TRUE.equals(dto.getEnabled()) ) {
try {
controller.enableControllerService(node);
} catch (final Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d30a1843/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
index 8cab916..8957314 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
@@ -19,13 +19,11 @@ package org.apache.nifi.persistence;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.List;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.FlowSynchronizationException;
-import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.UninheritableFlowException;
/**
@@ -109,15 +107,4 @@ public interface FlowConfigurationDAO {
*/
void save(FlowController flow, boolean archive) throws IOException;
- /**
- * Instantiates and schedules all controller tasks from the file used in the
- * constructor
- *
- * @param controller
- * @return
- * @throws java.io.IOException
- * @returns all of the ReportingTasks that were instantiated & scheduled
- */
- List<ReportingTaskNode> loadReportingTasks(FlowController controller) throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d30a1843/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
index 3000869..b93ae8a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
@@ -21,68 +21,36 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
-import javax.xml.XMLConstants;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import javax.xml.validation.Validator;
-
import org.apache.nifi.cluster.protocol.DataFlow;
-import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.FlowSynchronizationException;
import org.apache.nifi.controller.FlowSynchronizer;
-import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.StandardFlowSerializer;
import org.apache.nifi.controller.StandardFlowSynchronizer;
import org.apache.nifi.controller.UninheritableFlowException;
-import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
-import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.nar.NarCloseable;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.reporting.ReportingInitializationContext;
-import org.apache.nifi.reporting.ReportingTask;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.util.DomUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.w3c.dom.DOMException;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-import org.xml.sax.SAXException;
-import org.xml.sax.SAXParseException;
public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationDAO {
public static final String CONFIGURATION_ARCHIVE_DIR_KEY = "nifi.flow.configuration.archive.dir";
private final Path flowXmlPath;
- private final Path taskConfigXmlPath;
private final StringEncryptor encryptor;
private static final Logger LOG = LoggerFactory.getLogger(StandardXMLFlowConfigurationDAO.class);
- public StandardXMLFlowConfigurationDAO(final Path flowXml, final Path taskConfigXml, final Path serviceConfigXml, final StringEncryptor encryptor) throws IOException {
+ public StandardXMLFlowConfigurationDAO(final Path flowXml, final StringEncryptor encryptor) throws IOException {
final File flowXmlFile = flowXml.toFile();
if (!flowXmlFile.exists()) {
Files.createDirectories(flowXml.getParent());
@@ -92,13 +60,7 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
throw new IOException(flowXml + " exists but you have insufficient read/write privileges");
}
- final File taskConfigXmlFile = Objects.requireNonNull(taskConfigXml).toFile();
- if ((!taskConfigXmlFile.exists() || !taskConfigXmlFile.canRead())) {
- throw new IOException(taskConfigXml + " does not appear to exist or cannot be read. Cannot load configuration.");
- }
-
this.flowXmlPath = flowXml;
- this.taskConfigXmlPath = taskConfigXml;
this.encryptor = encryptor;
}
@@ -193,144 +155,4 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
}
}
- @Override
- public List<ReportingTaskNode> loadReportingTasks(final FlowController controller) {
- final List<ReportingTaskNode> tasks = new ArrayList<>();
- if (taskConfigXmlPath == null) {
- LOG.info("No reporting tasks to start");
- return tasks;
- }
-
- try {
- final URL schemaUrl = getClass().getResource("/ReportingTaskConfiguration.xsd");
- final Document document = parse(taskConfigXmlPath.toFile(), schemaUrl);
-
- final NodeList tasksNodes = document.getElementsByTagName("tasks");
- final Element tasksElement = (Element) tasksNodes.item(0);
-
- for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "task")) {
- //add global properties common to all tasks
- Map<String, String> properties = new HashMap<>();
-
- //get properties for the specific reporting task - id, name, class,
- //and schedulingPeriod must be set
- final String taskName = DomUtils.getChild(taskElement, "name").getTextContent().trim();
-
- final List<Element> schedulingStrategyNodeList = DomUtils.getChildElementsByTagName(taskElement, "schedulingStrategy");
- String schedulingStrategyValue = SchedulingStrategy.TIMER_DRIVEN.name();
- if (schedulingStrategyNodeList.size() == 1) {
- final String specifiedValue = schedulingStrategyNodeList.get(0).getTextContent();
-
- try {
- schedulingStrategyValue = SchedulingStrategy.valueOf(specifiedValue).name();
- } catch (final Exception e) {
- throw new RuntimeException("Cannot start Reporting Task with name " + taskName + " because its Scheduling Strategy does not have a valid value", e);
- }
- }
-
- final SchedulingStrategy schedulingStrategy = SchedulingStrategy.valueOf(schedulingStrategyValue);
- final String taskSchedulingPeriod = DomUtils.getChild(taskElement, "schedulingPeriod").getTextContent().trim();
- final String taskClass = DomUtils.getChild(taskElement, "class").getTextContent().trim();
-
- //optional task-specific properties
- for (final Element optionalProperty : DomUtils.getChildElementsByTagName(taskElement, "property")) {
- final String name = optionalProperty.getAttribute("name");
- final String value = optionalProperty.getTextContent().trim();
- properties.put(name, value);
- }
-
- //set the class to be used for the configured reporting task
- final ReportingTaskNode reportingTaskNode;
- try {
- reportingTaskNode = controller.createReportingTask(taskClass);
- } catch (final ReportingTaskInstantiationException e) {
- LOG.error("Unable to load reporting task {} due to {}", new Object[]{taskName, e});
- if (LOG.isDebugEnabled()) {
- LOG.error("", e);
- }
- continue;
- }
-
- reportingTaskNode.setName(taskName);
- reportingTaskNode.setScheduldingPeriod(taskSchedulingPeriod);
- reportingTaskNode.setSchedulingStrategy(schedulingStrategy);
-
- final ReportingTask reportingTask = reportingTaskNode.getReportingTask();
-
- final ReportingInitializationContext config = new StandardReportingInitializationContext(
- reportingTask.getIdentifier(), taskName, schedulingStrategy, taskSchedulingPeriod, controller);
- reportingTask.initialize(config);
-
- final Map<PropertyDescriptor, String> resolvedProps;
- try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- resolvedProps = new HashMap<>();
- for (final Map.Entry<String, String> entry : properties.entrySet()) {
- final PropertyDescriptor descriptor = reportingTask.getPropertyDescriptor(entry.getKey());
- resolvedProps.put(descriptor, entry.getValue());
- }
- }
-
- for (final Map.Entry<PropertyDescriptor, String> entry : resolvedProps.entrySet()) {
- reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue());
- }
-
- tasks.add(reportingTaskNode);
- controller.startReportingTask(reportingTaskNode);
- }
- } catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) {
- LOG.error("Unable to load reporting tasks from {} due to {}", new Object[]{taskConfigXmlPath, t});
- if (LOG.isDebugEnabled()) {
- LOG.error("", t);
- }
- }
-
- return tasks;
- }
-
-
- private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException {
- final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
- final Schema schema = schemaFactory.newSchema(schemaUrl);
- final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
- docFactory.setSchema(schema);
- final DocumentBuilder builder = docFactory.newDocumentBuilder();
-
- builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
- @Override
- public void fatalError(final SAXParseException err) throws SAXException {
- LOG.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage());
- if (LOG.isDebugEnabled()) {
- LOG.error("Error Stack Dump", err);
- }
- throw err;
- }
-
- @Override
- public void error(final SAXParseException err) throws SAXParseException {
- LOG.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage());
- if (LOG.isDebugEnabled()) {
- LOG.error("Error Stack Dump", err);
- }
- throw err;
- }
-
- @Override
- public void warning(final SAXParseException err) throws SAXParseException {
- LOG.warn(" Config file line " + err.getLineNumber() + ", uri " + err.getSystemId() + " : message : " + err.getMessage());
- if (LOG.isDebugEnabled()) {
- LOG.warn("Warning stack dump", err);
- }
- throw err;
- }
- });
-
- // build the docuemnt
- final Document document = builder.parse(xmlFile);
-
- // ensure schema compliance
- final Validator validator = schema.newValidator();
- validator.validate(new DOMSource(document));
-
- return document;
- }
}
[5/5] incubator-nifi git commit: NIFI-250: Refactoring of controller
service states
Posted by ma...@apache.org.
NIFI-250: Refactoring of controller service states
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7de30ab1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7de30ab1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7de30ab1
Branch: refs/heads/NIFI-250
Commit: 7de30ab15ad9570233c4bff68f37acf324a66dda
Parents: 81d8454
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Feb 19 13:30:37 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Feb 19 13:30:37 2015 -0500
----------------------------------------------------------------------
.../nifi/components/PropertyDescriptor.java | 8 +
.../nifi/components/ValidationContext.java | 9 +
.../apache/nifi/util/MockValidationContext.java | 5 +
.../cluster/manager/impl/WebClusterManager.java | 30 ++-
.../nifi-framework-core-api/.gitignore | 1 +
.../controller/AbstractConfiguredComponent.java | 8 +-
.../nifi/controller/ProcessScheduler.java | 16 +-
.../apache/nifi/controller/ProcessorNode.java | 11 +
.../nifi/controller/ReportingTaskNode.java | 12 +
.../controller/ValidationContextFactory.java | 4 +
.../service/ControllerServiceNode.java | 24 +-
.../service/ControllerServiceProvider.java | 52 ++--
.../service/ControllerServiceState.java | 45 ++++
.../apache/nifi/controller/FlowController.java | 33 ++-
.../nifi/controller/StandardFlowSerializer.java | 6 +-
.../nifi/controller/StandardProcessorNode.java | 37 ++-
.../reporting/AbstractReportingTaskNode.java | 33 +++
.../scheduling/StandardProcessScheduler.java | 115 +++++++-
.../service/StandardControllerServiceNode.java | 79 +++---
.../StandardControllerServiceProvider.java | 270 +++++++++++--------
.../StandardControllerServiceReference.java | 3 +-
.../processor/StandardSchedulingContext.java | 5 +-
.../processor/StandardValidationContext.java | 14 +
.../StandardValidationContextFactory.java | 5 +
.../TestStandardControllerServiceProvider.java | 119 +++++++-
.../controller/service/mock/DummyProcessor.java | 49 ++++
.../dao/impl/StandardControllerServiceDAO.java | 13 +-
27 files changed, 798 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java b/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
index a4c855b..48b9645 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
@@ -142,6 +142,14 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
final Set<String> validIdentifiers = context.getControllerServiceLookup().getControllerServiceIdentifiers(controllerServiceDefinition);
if (validIdentifiers != null && validIdentifiers.contains(input)) {
final ControllerService controllerService = context.getControllerServiceLookup().getControllerService(input);
+ if ( !context.isValidationRequired(controllerService) ) {
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(getName())
+ .valid(true)
+ .build();
+ }
+
if (!context.getControllerServiceLookup().isControllerServiceEnabled(controllerService)) {
return new ValidationResult.Builder()
.input(context.getControllerServiceLookup().getControllerServiceName(controllerService.getIdentifier()))
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java b/nifi/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java
index e50f002..214fac9 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java
@@ -79,4 +79,13 @@ public interface ValidationContext {
* @return
*/
String getAnnotationData();
+
+ /**
+ * There are times when the framework needs to consider a component valid, even if it
+ * references an invalid ControllerService. This method will return <code>false</code>
+ * if the component is to be considered valid even if the given Controller Service is referenced
+ * and is invalid.
+ * @param service
+ */
+ boolean isValidationRequired(ControllerService service);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
index 34fd7de..6d32d0b 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
@@ -96,4 +96,9 @@ public class MockValidationContext implements ValidationContext, ControllerServi
final ControllerServiceConfiguration configuration = context.getConfiguration(serviceIdentifier);
return configuration == null ? null : serviceIdentifier;
}
+
+ @Override
+ public boolean isValidationRequired(final ControllerService service) {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index ee3c621..3e522b9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -1357,16 +1357,38 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return controllerServiceProvider.getAllControllerServices();
}
+
+ @Override
+ public void disableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.disableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void enableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.enableReferencingServices(serviceNode);
+ }
+
@Override
- public void activateReferencingComponents(final ControllerServiceNode serviceNode) {
- controllerServiceProvider.activateReferencingComponents(serviceNode);
+ public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.scheduleReferencingComponents(serviceNode);
}
@Override
- public void deactivateReferencingComponents(final ControllerServiceNode serviceNode) {
- controllerServiceProvider.deactivateReferencingComponents(serviceNode);
+ public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
}
+ @Override
+ public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode);
+ }
+
+
private byte[] serialize(final Document doc) throws TransformerException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DOMSource domSource = new DOMSource(doc);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore
index ea8c4bf..29546b5 100755
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore
@@ -1 +1,2 @@
/target
+/target/
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
index af65b41..c44161f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
@@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -260,12 +261,17 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
return true;
}
+
@Override
public Collection<ValidationResult> getValidationErrors() {
+ return getValidationErrors(Collections.<String>emptySet());
+ }
+
+ public Collection<ValidationResult> getValidationErrors(final Set<String> serviceIdentifiersNotToValidate) {
final List<ValidationResult> results = new ArrayList<>();
lock.lock();
try {
- final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData());
+ final ValidationContext validationContext = validationContextFactory.newValidationContext(serviceIdentifiersNotToValidate, getProperties(), getAnnotationData());
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
index 724d1f2..c3b6613 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -19,8 +19,7 @@ package org.apache.nifi.controller;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
-import org.apache.nifi.processor.annotation.OnScheduled;
-import org.apache.nifi.processor.annotation.OnUnscheduled;
+import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
public interface ProcessScheduler {
@@ -151,9 +150,20 @@ public interface ProcessScheduler {
void unschedule(ReportingTaskNode taskNode);
/**
- * Begins scheduling the given Reporting Taks to run
+ * Begins scheduling the given Reporting Task to run
* @param taskNode
*/
void schedule(ReportingTaskNode taskNode);
+ /**
+ * Enables the Controller Service so that it can be used by Reporting Tasks and Processors
+ * @param service
+ */
+ void enableControllerService(ControllerServiceNode service);
+
+ /**
+ * Disables the Controller Service so that it can be updated
+ * @param service
+ */
+ void disableControllerService(ControllerServiceNode service);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 81ef7c0..3189edd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -21,6 +21,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.Processor;
@@ -82,4 +83,14 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
* @return
*/
public abstract int getActiveThreadCount();
+
+ /**
+ * Verifies that this Processor can be started if the provided set of
+ * services are enabled. This is introduced because we need to verify that all components
+ * can be started before starting any of them. In order to do that, we need to know that this
+ * component can be started if the given services are enabled, as we will then enable the given
+ * services before starting this component.
+ * @param ignoredReferences
+ */
+ public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index 76285d1..c932f30 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -16,8 +16,10 @@
*/
package org.apache.nifi.controller;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -74,6 +76,16 @@ public interface ReportingTaskNode extends ConfiguredComponent {
void setComments(String comment);
+ /**
+ * Verifies that this Reporting Task can be enabled if the provided set of
+ * services are enabled. This is introduced because we need to verify that all components
+ * can be started before starting any of them. In order to do that, we need to know that this
+ * component can be started if the given services are enabled, as we will then enable the given
+ * services before starting this component.
+ * @param ignoredReferences
+ */
+ void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
+
void verifyCanStart();
void verifyCanStop();
void verifyCanDisable();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
index df3c251..09479d5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller;
import java.util.Map;
+import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@@ -24,4 +25,7 @@ import org.apache.nifi.components.ValidationContext;
public interface ValidationContextFactory {
ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData);
+
+ ValidationContext newValidationContext(Set<String> serviceIdentifiersToNotValidate, Map<PropertyDescriptor, String> properties, String annotationData);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 4822958..68357b8 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.service;
import java.util.Set;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
@@ -27,18 +28,9 @@ public interface ControllerServiceNode extends ConfiguredComponent {
ControllerService getControllerServiceImplementation();
- boolean isDisabled();
-
- void enable();
- void disable();
+ ControllerServiceState getState();
+ void setState(ControllerServiceState state);
- /**
- * Disables the Controller Service but does not verify that the provided set of referencing
- * Controller Services should be verified as disabled first
- * @param ignoredReferences
- */
- void disable(Set<ControllerServiceNode> ignoredReferences);
-
ControllerServiceReference getReferences();
void addReference(ConfiguredComponent referringComponent);
@@ -62,6 +54,16 @@ public interface ControllerServiceNode extends ConfiguredComponent {
*/
void verifyCanDisable(Set<ControllerServiceNode> ignoredReferences);
+ /**
+ * Verifies that this Controller Service can be enabled if the provided set of
+ * services are also enabled. This is introduced because we can have an instance where
+ * A reference B, which references C, which references A and we want to enable
+ * Service A. In this case, the cycle needs to not cause us to fail, so we want to verify
+ * that A can be enabled if A and B also are.
+ * @param ignoredReferences
+ */
+ void verifyCanEnable(Set<ControllerServiceNode> ignoredReferences);
+
void verifyCanDelete();
void verifyCanUpdate();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 7a767bf..351a036 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -75,27 +75,47 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
*/
Set<ControllerServiceNode> getAllControllerServices();
+
/**
- * Recursively stops all Processors and Reporting Tasks that are referencing the given Controller Service,
- * as well as disabling any Controller Service that references this Controller Service (and stops
- * all Reporting Task or Controller Service that is referencing it, and so on).
+ * Recursively unschedules all schedulable components (Processors and Reporting Tasks) that reference the given
+ * Controller Service. For any Controller services that reference this one, its schedulable referencing components will also
+ * be unscheduled.
* @param serviceNode
*/
- void deactivateReferencingComponents(ControllerServiceNode serviceNode);
+ void unscheduleReferencingComponents(ControllerServiceNode serviceNode);
/**
- * <p>
- * Starts any enabled Processors and Reporting Tasks that are referencing this Controller Service. If other Controller
- * Services reference this Controller Service, will also enable those services and 'activate' any components referencing
- * them.
- * </p>
- *
- * <p>
- * NOTE: If any component cannot be started, an IllegalStateException will be thrown an no more components will
- * be activated. This method provides no atomicity.
- * </p>
- *
+ * Disables any Controller Service that references the provided Controller Service. This action is performed recursively
+ * so that if service A references B and B references C, disabling references for C will first disable A, then B.
+ * @param serviceNode
+ */
+ void disableReferencingServices(ControllerServiceNode serviceNode);
+
+ /**
+ * Verifies that all Controller Services referencing the provided ControllerService can be enabled.
+ * @param serviceNode
+ */
+ void verifyCanEnableReferencingServices(ControllerServiceNode serviceNode);
+
+ /**
+ * Verifies that all enabled Processors referencing the ControllerService (or a service that depends on
+ * the provided service) can be scheduled to run.
+ * @param serviceNode
+ */
+ void verifyCanScheduleReferencingComponents(ControllerServiceNode serviceNode);
+
+ /**
+ * Enables all Controller Services that are referencing the given service. If Service A references Service B and Service
+ * B references serviceNode, Service A and B will both be enabled.
+ * @param serviceNode
+ */
+ void enableReferencingServices(ControllerServiceNode serviceNode);
+
+ /**
+ * Schedules any schedulable component (Processor, ReportingTask) that is referencing the given Controller Service
+ * to run. This is performed recursively, so if a Processor is referencing Service A, which is referencing serviceNode,
+ * then the Processor will also be started.
* @param serviceNode
*/
- void activateReferencingComponents(ControllerServiceNode serviceNode);
+ void scheduleReferencingComponents(ControllerServiceNode serviceNode);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java
new file mode 100644
index 0000000..2ed8fd9
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+
+/**
+ * Represents the valid states for a Controller Service.
+ */
+public enum ControllerServiceState {
+ /**
+ * Controller Service is disabled and cannot be used.
+ */
+ DISABLED,
+
+ /**
+ * Controller Service has been disabled but has not yet finished its lifecycle
+ * methods.
+ */
+ DISABLING,
+
+ /**
+ * Controller Service has been enabled but has not yet finished its lifecycle methods.
+ */
+ ENABLING,
+
+ /**
+ * Controller Service has been enabled and has finished its lifecycle methods. The Controller SErvice
+ * is ready to be used.
+ */
+ ENABLED;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2825d5b..0a86145 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2570,12 +2570,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
return reportingTasks.values();
}
-
- @Override
- public void activateReferencingComponents(final ControllerServiceNode serviceNode) {
- controllerServiceProvider.activateReferencingComponents(serviceNode);
- }
-
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
@@ -2591,10 +2585,24 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
processScheduler.disableReportingTask(reportingTaskNode);
}
+ @Override
+ public void disableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.disableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void enableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.enableReferencingServices(serviceNode);
+ }
@Override
- public void deactivateReferencingComponents(final ControllerServiceNode serviceNode) {
- controllerServiceProvider.deactivateReferencingComponents(serviceNode);
+ public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.scheduleReferencingComponents(serviceNode);
+ }
+
+ @Override
+ public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
}
@Override
@@ -2609,6 +2617,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
controllerServiceProvider.disableControllerService(serviceNode);
}
+ @Override
+ public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode);
+ }
@Override
public ControllerService getControllerService(final String serviceIdentifier) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
index f281fa7..832df7c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
@@ -41,6 +41,7 @@ import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.groups.ProcessGroup;
@@ -412,7 +413,10 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(serviceElement, "name", serviceNode.getName());
addTextElement(serviceElement, "comment", serviceNode.getComments());
addTextElement(serviceElement, "class", serviceNode.getControllerServiceImplementation().getClass().getCanonicalName());
- addTextElement(serviceElement, "enabled", String.valueOf(!serviceNode.isDisabled()));
+
+ final ControllerServiceState state = serviceNode.getState();
+ final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING);
+ addTextElement(serviceElement, "enabled", String.valueOf(enabled));
addConfiguration(serviceElement, serviceNode.getProperties(), serviceNode.getAnnotationData(), encryptor);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index af25955..355e303 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -52,6 +52,7 @@ import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Position;
+import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogLevel;
@@ -120,7 +121,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
@SuppressWarnings("deprecation")
- StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory,
+ public StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory,
final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider) {
super(processor, uuid, validationContextFactory, controllerServiceProvider);
@@ -1192,8 +1193,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
public void verifyCanStart() {
readLock.lock();
try {
- if (scheduledState.get() != ScheduledState.STOPPED) {
- throw new IllegalStateException(this + " is not stopped");
+ switch (getScheduledState()) {
+ case DISABLED:
+ throw new IllegalStateException(this + " cannot be started because it is disabled");
+ case RUNNING:
+ throw new IllegalStateException(this + " cannot be started because it is already running");
+ case STOPPED:
+ break;
}
verifyNoActiveThreads();
@@ -1204,6 +1210,31 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
readLock.unlock();
}
}
+
+ @Override
+ public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
+ switch (getScheduledState()) {
+ case DISABLED:
+ throw new IllegalStateException(this + " cannot be started because it is disabled");
+ case RUNNING:
+ throw new IllegalStateException(this + " cannot be started because it is already running");
+ case STOPPED:
+ break;
+ }
+ verifyNoActiveThreads();
+
+ final Set<String> ids = new HashSet<>();
+ for ( final ControllerServiceNode node : ignoredReferences ) {
+ ids.add(node.getIdentifier());
+ }
+
+ final Collection<ValidationResult> validationResults = getValidationErrors(ids);
+ for ( final ValidationResult result : validationResults ) {
+ if ( !result.isValid() ) {
+ throw new IllegalStateException(this + " cannot be started because it is not valid: " + result);
+ }
+ }
+ }
@Override
public void verifyCanStop() {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 079ff31..272c0ba 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -16,9 +16,13 @@
*/
package org.apache.nifi.controller.reporting;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
@@ -28,6 +32,7 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.nar.NarCloseable;
@@ -212,4 +217,32 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
throw new IllegalStateException("Cannot update " + reportingTask + " because it is currently running");
}
}
+
+ @Override
+ public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
+ switch (getScheduledState()) {
+ case DISABLED:
+ throw new IllegalStateException(this + " cannot be started because it is disabled");
+ case RUNNING:
+ throw new IllegalStateException(this + " cannot be started because it is already running");
+ case STOPPED:
+ break;
+ }
+ final int activeThreadCount = getActiveThreadCount();
+ if ( activeThreadCount > 0 ) {
+ throw new IllegalStateException(this + " cannot be started because it has " + activeThreadCount + " active threads already");
+ }
+
+ final Set<String> ids = new HashSet<>();
+ for ( final ControllerServiceNode node : ignoredReferences ) {
+ ids.add(node.getIdentifier());
+ }
+
+ final Collection<ValidationResult> validationResults = getValidationErrors(ids);
+ for ( final ValidationResult result : validationResults ) {
+ if ( !result.isValid() ) {
+ throw new IllegalStateException(this + " cannot be started because it is not valid: " + result);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index b6699d2..2b4757d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -45,6 +45,8 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.logging.ProcessorLog;
@@ -212,7 +214,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
if (!scheduleState.isScheduled()) {
return;
}
-
+
+ taskNode.verifyCanStop();
final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
final ReportingTask reportingTask = taskNode.getReportingTask();
scheduleState.setScheduled(false);
@@ -313,11 +316,12 @@ public final class StandardProcessScheduler implements ProcessScheduler {
return;
}
} catch (final Exception e) {
+ final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}",
- new Object[]{procNode.getProcessor(), e.getCause(), administrativeYieldDuration}, e.getCause());
- LOG.error("Failed to invoke @OnScheduled method due to {}", e.getCause().toString(), e.getCause());
+ new Object[]{procNode.getProcessor(), cause.getCause(), administrativeYieldDuration}, cause.getCause());
+ LOG.error("Failed to invoke @OnScheduled method due to {}", cause.getCause().toString(), cause.getCause());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, procNode.getProcessor(), processContext);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext);
@@ -610,4 +614,109 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
return scheduleState;
}
+
+ @Override
+ public void enableControllerService(final ControllerServiceNode service) {
+ service.verifyCanEnable();
+ service.setState(ControllerServiceState.ENABLING);
+ final ScheduleState scheduleState = getScheduleState(service);
+
+ final Runnable enableRunnable = new Runnable() {
+ @Override
+ public void run() {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ long lastStopTime = scheduleState.getLastStopTime();
+ final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider);
+
+ while (true) {
+ try {
+ synchronized (scheduleState) {
+ // if no longer enabled, then we're finished. This can happen, for example,
+ // if the @OnEnabled method throws an Exception and the user disables the service
+ // while we're administratively yielded.
+ //
+ // we also check if the schedule state's last stop time is equal to what it was before.
+ // if not, then means that the service has been disabled and enabled again, so we should just
+ // bail; another thread will be responsible for invoking the @OnEnabled methods.
+ if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) {
+ return;
+ }
+
+ ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service.getControllerServiceImplementation(), configContext);
+ heartbeater.heartbeat();
+ service.setState(ControllerServiceState.ENABLED);
+ return;
+ }
+ } catch (final Exception e) {
+ // TODO: Generate a bulletin just like in startProcessor
+ final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
+ LOG.error("Failed to invoke @OnEnabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
+ if ( LOG.isDebugEnabled() ) {
+ LOG.error("", cause);
+ }
+
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
+ Thread.sleep(administrativeYieldMillis);
+ continue;
+ }
+ }
+ } catch (final Throwable t) {
+ // TODO: Generate a bulletin just like in startProcessor
+ final Throwable cause = (t instanceof InvocationTargetException) ? t.getCause() : t;
+ LOG.error("Failed to invoke @OnEnabled method on {} due to {}", service.getControllerServiceImplementation(), cause.toString());
+ if ( LOG.isDebugEnabled() ) {
+ LOG.error("", cause);
+ }
+ }
+ }
+ };
+
+ scheduleState.setScheduled(true);
+ componentLifeCycleThreadPool.execute(enableRunnable);
+ }
+
+ @Override
+ public void disableControllerService(final ControllerServiceNode service) {
+ service.verifyCanDisable();
+
+ final ScheduleState state = getScheduleState(requireNonNull(service));
+ final Runnable disableRunnable = new Runnable() {
+ @Override
+ public void run() {
+ synchronized (state) {
+ state.setScheduled(false);
+ }
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider);
+
+ while(true) {
+ try {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
+ heartbeater.heartbeat();
+ service.setState(ControllerServiceState.DISABLED);
+ return;
+ } catch (final Exception e) {
+ // TODO: Generate a bulletin just like in startProcessor
+ final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
+ LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
+ if ( LOG.isDebugEnabled() ) {
+ LOG.error("", cause);
+ }
+
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
+ try {
+ Thread.sleep(administrativeYieldMillis);
+ } catch (final InterruptedException ie) {}
+
+ continue;
+ }
+ }
+ }
+ }
+ };
+
+ service.setState(ControllerServiceState.DISABLING);
+ componentLifeCycleThreadPool.execute(disableRunnable);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index b29f86b..c8c7ec9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,14 +16,16 @@
*/
package org.apache.nifi.controller.service;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
@@ -40,7 +42,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
private final ControllerService implementation;
private final ControllerServiceProvider serviceProvider;
- private final AtomicBoolean disabled = new AtomicBoolean(true);
+ private final AtomicReference<ControllerServiceState> stateRef = new AtomicReference<>(ControllerServiceState.DISABLED);
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
@@ -57,34 +59,8 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
this.serviceProvider = serviceProvider;
}
- @Override
- public boolean isDisabled() {
- return disabled.get();
- }
-
-
- @Override
- public void enable() {
- if ( !isValid() ) {
- throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid");
- }
-
- this.disabled.set(false);
- }
@Override
- public void disable() {
- verifyCanDisable();
- this.disabled.set(true);
- }
-
- @Override
- public void disable(final Set<ControllerServiceNode> ignoredReferences) {
- verifyCanDisable(ignoredReferences);
- this.disabled.set(true);
- }
-
- @Override
public ControllerService getProxiedControllerService() {
return proxedControllerService;
}
@@ -126,7 +102,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void verifyModifiable() throws IllegalStateException {
- if (!isDisabled()) {
+ if (getState() != ControllerServiceState.DISABLED) {
throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first.");
}
}
@@ -134,7 +110,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void setProperty(final String name, final String value) {
super.setProperty(name, value);
-
onConfigured();
}
@@ -160,7 +135,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void verifyCanDelete() {
- if ( !isDisabled() ) {
+ if ( getState() != ControllerServiceState.DISABLED ) {
throw new IllegalStateException(implementation + " cannot be deleted because it is not disabled");
}
}
@@ -172,6 +147,11 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
+ final ControllerServiceState state = getState();
+ if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+ throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation() + " because it is not enabled");
+ }
+
final ControllerServiceReference references = getReferences();
for ( final ConfiguredComponent activeReference : references.getActiveReferences() ) {
@@ -183,14 +163,37 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void verifyCanEnable() {
- if ( !isDisabled() ) {
+ if ( getState() != ControllerServiceState.DISABLED ) {
throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
}
+
+ if ( !isValid() ) {
+ throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + getValidationErrors());
+ }
+ }
+
+ @Override
+ public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) {
+ if (getState() != ControllerServiceState.DISABLED) {
+ throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
+ }
+
+ final Set<String> ids = new HashSet<>();
+ for ( final ControllerServiceNode node : ignoredReferences ) {
+ ids.add(node.getIdentifier());
+ }
+
+ final Collection<ValidationResult> validationResults = getValidationErrors(ids);
+ for ( final ValidationResult result : validationResults ) {
+ if ( !result.isValid() ) {
+ throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + result);
+ }
+ }
}
@Override
public void verifyCanUpdate() {
- if ( !isDisabled() ) {
+ if ( getState() != ControllerServiceState.DISABLED ) {
throw new IllegalStateException(implementation + " cannot be updated because it is not disabled");
}
}
@@ -214,4 +217,14 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
writeLock.unlock();
}
}
+
+ @Override
+ public ControllerServiceState getState() {
+ return stateRef.get();
+ }
+
+ @Override
+ public void setState(final ControllerServiceState state) {
+ this.stateRef.set(state);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index c584188..d6596a4 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -32,8 +32,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.nifi.annotation.lifecycle.OnAdded;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
@@ -42,6 +40,7 @@ import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
@@ -128,7 +127,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
final ControllerServiceNode node = serviceNodeHolder.get();
- if (node.isDisabled() && !validDisabledMethods.contains(method)) {
+ final ControllerServiceState state = node.getState();
+ final boolean disabled = (state != ControllerServiceState.ENABLED); // only allow method call if service state is ENABLED.
+ if (disabled && !validDisabledMethods.contains(method)) {
// Use nar class loader here because we are implicitly calling toString() on the original implementation.
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService + " because the Controller Service is disabled");
@@ -182,29 +183,108 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
}
+
+
@Override
- public void enableControllerService(final ControllerServiceNode serviceNode) {
- serviceNode.verifyCanEnable();
+ public void disableReferencingServices(final ControllerServiceNode serviceNode) {
+ // Get a list of all Controller Services that need to be disabled, in the order that they need to be
+ // disabled.
+ final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, this);
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation(), configContext);
+ for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+ final ControllerServiceState state = nodeToDisable.getState();
+
+ if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+ nodeToDisable.verifyCanDisable(serviceSet);
+ }
}
- serviceNode.enable();
+ Collections.reverse(toDisable);
+ for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+ final ControllerServiceState state = nodeToDisable.getState();
+
+ if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+ disableControllerService(nodeToDisable);
+ }
+ }
}
+
@Override
- public void disableControllerService(final ControllerServiceNode serviceNode) {
- serviceNode.verifyCanDisable();
-
- // We must set the service to disabled before we invoke the OnDisabled methods because the service node
- // can throw Exceptions if we attempt to disable the service while it's known to be in use.
- serviceNode.disable();
+ public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
+ // or a service that references this controller service, etc.
+ final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
+ final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation());
+ // verify that we can start all components (that are not disabled) before doing anything
+ for ( final ProcessorNode node : processors ) {
+ if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+ node.verifyCanStart();
+ }
+ }
+ for ( final ReportingTaskNode node : reportingTasks ) {
+ if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+ node.verifyCanStart();
+ }
+ }
+
+ // start all of the components that are not disabled
+ for ( final ProcessorNode node : processors ) {
+ if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+ node.getProcessGroup().startProcessor(node);
+ }
}
+ for ( final ReportingTaskNode node : reportingTasks ) {
+ if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+ processScheduler.schedule(node);
+ }
+ }
+ }
+
+ @Override
+ public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
+ // or a service that references this controller service, etc.
+ final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
+ final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
+
+ // verify that we can stop all components (that are running) before doing anything
+ for ( final ProcessorNode node : processors ) {
+ if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+ node.verifyCanStop();
+ }
+ }
+ for ( final ReportingTaskNode node : reportingTasks ) {
+ if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+ node.verifyCanStop();
+ }
+ }
+
+ // stop all of the components that are running
+ for ( final ProcessorNode node : processors ) {
+ if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+ node.getProcessGroup().stopProcessor(node);
+ }
+ }
+ for ( final ReportingTaskNode node : reportingTasks ) {
+ if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+ processScheduler.unschedule(node);
+ }
+ }
+ }
+
+ @Override
+ public void enableControllerService(final ControllerServiceNode serviceNode) {
+ serviceNode.verifyCanEnable();
+ processScheduler.enableControllerService(serviceNode);
+ }
+
+ @Override
+ public void disableControllerService(final ControllerServiceNode serviceNode) {
+ serviceNode.verifyCanDisable();
+ processScheduler.disableControllerService(serviceNode);
}
@Override
@@ -221,7 +301,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
- return (node == null) ? false : !node.isDisabled();
+ return (node == null) ? false : (ControllerServiceState.ENABLED == node.getState());
}
@Override
@@ -281,120 +361,94 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
return new HashSet<>(controllerServices.values());
}
- @Override
- public void deactivateReferencingComponents(final ControllerServiceNode serviceNode) {
- deactivateReferencingComponents(serviceNode, new HashSet<ControllerServiceNode>());
- }
- private void deactivateReferencingComponents(final ControllerServiceNode serviceNode, final Set<ControllerServiceNode> visited) {
- final ControllerServiceReference reference = serviceNode.getReferences();
+ /**
+ * Returns a List of all components that reference the given referencedNode (either directly or indirectly through
+ * another service) that are also of the given componentType. The list that is returned is in the order in which they will
+ * need to be 'activated' (enabled/started).
+ * @param referencedNode
+ * @param componentType
+ * @return
+ */
+ private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) {
+ final List<T> references = new ArrayList<>();
- final Set<ConfiguredComponent> components = reference.getActiveReferences();
- for (final ConfiguredComponent component : components) {
- if ( component instanceof ControllerServiceNode ) {
- // If we've already visited this component (there is a loop such that
- // we are disabling Controller Service A, but B depends on A and A depends on B)
- // we don't need to disable this component because it will be disabled after we return
- if ( visited.contains(component) ) {
- continue;
- }
+ for ( final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents() ) {
+ if ( componentType.isAssignableFrom(referencingComponent.getClass()) ) {
+ references.add(componentType.cast(referencingComponent));
+ }
+
+ if ( referencingComponent instanceof ControllerServiceNode ) {
+ final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent;
- visited.add(serviceNode);
- deactivateReferencingComponents((ControllerServiceNode) component, visited);
+ // find components recursively that depend on referencingNode.
+ final List<T> recursive = findRecursiveReferences(referencingNode, componentType);
- if (isControllerServiceEnabled(serviceNode.getIdentifier())) {
- serviceNode.verifyCanDisable(visited);
- serviceNode.disable(visited);
- }
- } else if ( component instanceof ReportingTaskNode ) {
- final ReportingTaskNode taskNode = (ReportingTaskNode) component;
- if (taskNode.isRunning()) {
- taskNode.verifyCanStop();
- processScheduler.unschedule(taskNode);
- }
- } else if ( component instanceof ProcessorNode ) {
- final ProcessorNode procNode = (ProcessorNode) component;
- if ( procNode.isRunning() ) {
- procNode.getProcessGroup().stopProcessor(procNode);
- }
+ // For anything that depends on referencing node, we want to add it to the list, but we know
+ // that it must come after the referencing node, so we first remove any existing occurrence.
+ references.removeAll(recursive);
+ references.addAll(recursive);
}
}
+
+ return references;
}
@Override
- public void activateReferencingComponents(final ControllerServiceNode serviceNode) {
- activateReferencingComponents(serviceNode, new HashSet<ControllerServiceNode>());
+ public void enableReferencingServices(final ControllerServiceNode serviceNode) {
+ final List<ControllerServiceNode> recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ enableReferencingServices(serviceNode, recursiveReferences);
}
-
- /**
- * Recursively enables this controller service and any controller service that it references.
- * @param serviceNode
- */
- private void activateReferencedComponents(final ControllerServiceNode serviceNode) {
- for ( final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet() ) {
- final PropertyDescriptor key = entry.getKey();
- if ( key.getControllerServiceDefinition() == null ) {
- continue;
- }
-
- final String serviceId = entry.getValue() == null ? key.getDefaultValue() : entry.getValue();
- if ( serviceId == null ) {
- continue;
- }
-
- final ControllerServiceNode referencedNode = getControllerServiceNode(serviceId);
- if ( referencedNode == null ) {
- throw new IllegalStateException("Cannot activate referenced component of " + serviceNode + " because no service exists with ID " + serviceId);
+ private void enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) {
+ serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences));
+
+ final List<ControllerServiceNode> toEnable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ for ( final ControllerServiceNode nodeToEnable : toEnable ) {
+ final ControllerServiceState state = nodeToEnable.getState();
+ if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+ nodeToEnable.verifyCanEnable();
}
-
- activateReferencedComponents(referencedNode);
-
- if ( referencedNode.isDisabled() ) {
- enableControllerService(referencedNode);
+ }
+
+ for ( final ControllerServiceNode nodeToEnable : toEnable ) {
+ final ControllerServiceState state = nodeToEnable.getState();
+ if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+ enableControllerService(nodeToEnable);
}
}
}
- private void activateReferencingComponents(final ControllerServiceNode serviceNode, final Set<ControllerServiceNode> visited) {
- if ( serviceNode.isDisabled() ) {
- throw new IllegalStateException("Cannot activate referencing components of " + serviceNode.getControllerServiceImplementation() + " because the Controller Service is disabled");
+ @Override
+ public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
+ final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
+
+ for ( final ControllerServiceNode referencingService : referencingServices ) {
+ referencingService.verifyCanEnable(referencingServiceSet);
}
+ }
+
+ @Override
+ public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ final List<ReportingTaskNode> referencingReportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
+ final List<ProcessorNode> referencingProcessors = findRecursiveReferences(serviceNode, ProcessorNode.class);
- final ControllerServiceReference ref = serviceNode.getReferences();
- final Set<ConfiguredComponent> components = ref.getReferencingComponents();
+ final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
- // First, activate any other controller services. We do this first so that we can
- // avoid the situation where Processor X depends on Controller Services Y and Z; and
- // Controller Service Y depends on Controller Service Z. In this case, if we first attempted
- // to start Processor X, we would fail because Controller Service Y is disabled. THis way, we
- // can recursively enable everything.
- for ( final ConfiguredComponent component : components ) {
- if (component instanceof ControllerServiceNode) {
- final ControllerServiceNode componentNode = (ControllerServiceNode) component;
- activateReferencedComponents(componentNode);
-
- if ( componentNode.isDisabled() ) {
- enableControllerService(componentNode);
- }
-
- activateReferencingComponents(componentNode);
+ for ( final ReportingTaskNode taskNode : referencingReportingTasks ) {
+ if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
+ taskNode.verifyCanStart(referencingServiceSet);
}
}
- for ( final ConfiguredComponent component : components ) {
- if (component instanceof ProcessorNode) {
- final ProcessorNode procNode = (ProcessorNode) component;
- if ( !procNode.isRunning() ) {
- procNode.getProcessGroup().startProcessor(procNode);
- }
- } else if (component instanceof ReportingTaskNode) {
- final ReportingTaskNode taskNode = (ReportingTaskNode) component;
- if ( !taskNode.isRunning() ) {
- processScheduler.schedule(taskNode);
- }
+ for ( final ProcessorNode procNode : referencingProcessors ) {
+ if ( procNode.getScheduledState() != ScheduledState.DISABLED ) {
+ procNode.verifyCanStart(referencingServiceSet);
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
index 97921d6..c470b99 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
@@ -66,7 +66,8 @@ public class StandardControllerServiceReference implements ControllerServiceRefe
if (component instanceof ControllerServiceNode) {
serviceNodes.add((ControllerServiceNode) component);
- if ( !((ControllerServiceNode) component).isDisabled() ) {
+ final ControllerServiceState state = ((ControllerServiceNode) component).getState();
+ if ( state != ControllerServiceState.DISABLED ) {
activeReferences.add(component);
}
} else if (isRunning(component)) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index ac58504..c37a80d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@ -25,6 +25,7 @@ import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceState;
public class StandardSchedulingContext implements SchedulingContext {
@@ -45,8 +46,8 @@ public class StandardSchedulingContext implements SchedulingContext {
throw new IllegalArgumentException("Cannot lease Controller Service because no Controller Service exists with identifier " + identifier);
}
- if (serviceNode.isDisabled()) {
- throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is currently disabled");
+ if ( serviceNode.getState() != ControllerServiceState.ENABLED ) {
+ throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is not currently enabled");
}
if (!serviceNode.isValid()) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
index b216572..57f13d2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processor;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
@@ -38,11 +39,17 @@ public class StandardValidationContext implements ValidationContext {
private final Map<PropertyDescriptor, String> properties;
private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
private final String annotationData;
+ private final Set<String> serviceIdentifiersToNotValidate;
public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map<PropertyDescriptor, String> properties, final String annotationData) {
+ this(controllerServiceProvider, Collections.<String>emptySet(), properties, annotationData);
+ }
+
+ public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Set<String> serviceIdentifiersToNotValidate, final Map<PropertyDescriptor, String> properties, final String annotationData) {
this.controllerServiceProvider = controllerServiceProvider;
this.properties = new HashMap<>(properties);
this.annotationData = annotationData;
+ this.serviceIdentifiersToNotValidate = serviceIdentifiersToNotValidate;
preparedQueries = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
@@ -94,4 +101,11 @@ public class StandardValidationContext implements ValidationContext {
public ControllerServiceLookup getControllerServiceLookup() {
return controllerServiceProvider;
}
+
+ @Override
+ public boolean isValidationRequired(final ControllerService service) {
+ return !serviceIdentifiersToNotValidate.contains(service.getIdentifier());
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java
index e172f93..c3df987 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processor;
import java.util.Map;
+import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@@ -36,4 +37,8 @@ public class StandardValidationContextFactory implements ValidationContextFactor
return new StandardValidationContext(serviceProvider, properties, annotationData);
}
+ @Override
+ public ValidationContext newValidationContext(final Set<String> serviceIdentifiersToNotValidate, final Map<PropertyDescriptor, String> properties, final String annotationData) {
+ return new StandardValidationContext(serviceProvider, serviceIdentifiersToNotValidate, properties, annotationData);
+ }
}
[4/5] incubator-nifi git commit: NIFI-250: Refactoring of controller
service states
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index fbd3dd7..66abf30 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -16,14 +16,27 @@
*/
package org.apache.nifi.controller.service;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.UUID;
import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.StandardProcessorNode;
+import org.apache.nifi.controller.service.mock.DummyProcessor;
import org.apache.nifi.controller.service.mock.ServiceA;
import org.apache.nifi.controller.service.mock.ServiceB;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.StandardProcessorInitializationContext;
+import org.apache.nifi.processor.StandardValidationContextFactory;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestStandardControllerServiceProvider {
@@ -68,11 +81,11 @@ public class TestStandardControllerServiceProvider {
@Test
- public void testActivateReferencingComponentsGraph() {
+ public void testEnableReferencingServicesGraph() {
final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler);
- // build a graph of components with dependencies as such:
+ // build a graph of controller services with dependencies as such:
//
// A -> B -> D
// C ---^----^
@@ -81,7 +94,7 @@ public class TestStandardControllerServiceProvider {
// AND
// C references B and D.
//
- // So we have to verify that if D is enabled, when we enable its referencing components,
+ // So we have to verify that if D is enabled, when we enable its referencing services,
// we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
// until B is first enabled so ensure that we enable B first.
@@ -96,12 +109,102 @@ public class TestStandardControllerServiceProvider {
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
provider.enableControllerService(serviceNode4);
- provider.activateReferencingComponents(serviceNode4);
-
- assertFalse(serviceNode3.isDisabled());
- assertFalse(serviceNode2.isDisabled());
- assertFalse(serviceNode1.isDisabled());
+ provider.enableReferencingServices(serviceNode4);
+ assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
}
+
+ @Test
+ public void testStartStopReferencingComponents() {
+ final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+ final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler);
+
+ // build a graph of reporting tasks and controller services with dependencies as such:
+ //
+ // Processor P1 -> A -> B -> D
+ // Processor P2 -> C ---^----^
+ //
+ // In other words, Processor P1 references Controller Service A, which references B, which references D.
+ // AND
+ // Processor P2 references Controller Service C, which references B and D.
+ //
+ // So we have to verify that if D is enabled, when we enable its referencing services,
+ // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
+ // until B is first enabled so ensure that we enable B first.
+
+ final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
+ final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
+ final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
+ final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
+
+ final ProcessGroup mockProcessGroup = Mockito.mock(ProcessGroup.class);
+ Mockito.doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0];
+ procNode.verifyCanStart();
+ procNode.setScheduledState(ScheduledState.RUNNING);
+ return null;
+ }
+ }).when(mockProcessGroup).startProcessor(Mockito.any(ProcessorNode.class));
+
+ Mockito.doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0];
+ procNode.verifyCanStop();
+ procNode.setScheduledState(ScheduledState.STOPPED);
+ return null;
+ }
+ }).when(mockProcessGroup).stopProcessor(Mockito.any(ProcessorNode.class));
+
+ final String id1 = UUID.randomUUID().toString();
+ final ProcessorNode procNodeA = new StandardProcessorNode(new DummyProcessor(), id1,
+ new StandardValidationContextFactory(provider), scheduler, provider);
+ procNodeA.getProcessor().initialize(new StandardProcessorInitializationContext(id1, null, provider));
+ procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1");
+ procNodeA.setProcessGroup(mockProcessGroup);
+
+ final String id2 = UUID.randomUUID().toString();
+ final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(),id2,
+ new StandardValidationContextFactory(provider), scheduler, provider);
+ procNodeB.getProcessor().initialize(new StandardProcessorInitializationContext(id2, null, provider));
+ procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3");
+ procNodeB.setProcessGroup(mockProcessGroup);
+
+ serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+ serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
+ serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+ serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
+
+ provider.enableControllerService(serviceNode4);
+ provider.enableReferencingServices(serviceNode4);
+ provider.scheduleReferencingComponents(serviceNode4);
+
+ assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
+ assertTrue(procNodeA.isRunning());
+ assertTrue(procNodeB.isRunning());
+
+ // stop processors and verify results.
+ provider.unscheduleReferencingComponents(serviceNode4);
+ assertFalse(procNodeA.isRunning());
+ assertFalse(procNodeB.isRunning());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
+
+ provider.disableReferencingServices(serviceNode4);
+ assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
+ assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
+ assertEquals(ControllerServiceState.ENABLED, serviceNode4.getState());
+
+ provider.disableControllerService(serviceNode4);
+ assertEquals(ControllerServiceState.DISABLED, serviceNode4.getState());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
new file mode 100644
index 0000000..615e172
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service.mock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public class DummyProcessor extends AbstractProcessor {
+
+ public static final PropertyDescriptor SERVICE = new PropertyDescriptor.Builder()
+ .name("Controller Service")
+ .identifiesControllerService(ControllerService.class)
+ .required(true)
+ .build();
+
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(SERVICE);
+ return descriptors;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index c55960f..9c2f0e0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -146,9 +146,9 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
if (enabled != null) {
if (enabled) {
-
+ serviceProvider.enableReferencingServices(controllerService);
} else {
-
+ serviceProvider.disableReferencingServices(controllerService);
}
} else if (state != null) {
try {
@@ -156,13 +156,14 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
switch (scheduledState) {
case RUNNING:
-
+ serviceProvider.scheduleReferencingComponents(controllerService);
break;
case STOPPED:
-
+ serviceProvider.unscheduleReferencingComponents(controllerService);
break;
- default:
-
+ default:
+ throw new IllegalArgumentException(String.format(
+ "The specified state (%s) is not valid. Valid options are 'RUNNING' and 'STOPPED'.", state));
}
} catch (IllegalArgumentException iae) {
throw new IllegalArgumentException(String.format(
[2/5] incubator-nifi git commit: Merge branch 'NIFI-250' of
http://git-wip-us.apache.org/repos/asf/incubator-nifi into NIFI-250
Posted by ma...@apache.org.
Merge branch 'NIFI-250' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into NIFI-250
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2ac64329
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2ac64329
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2ac64329
Branch: refs/heads/NIFI-250
Commit: 2ac643292c691d451cf98e2a309bff72182b0a88
Parents: d30a184 6457929
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Feb 18 08:15:45 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Feb 18 08:15:45 2015 -0500
----------------------------------------------------------------------
README.md | 6 +-
nifi/README.md | 2 +-
nifi/nifi-api/pom.xml | 1 -
nifi/nifi-assembly/pom.xml | 1 -
.../nifi-data-provenance-utils/pom.xml | 1 -
.../nifi-expression-language/pom.xml | 1 -
.../nifi-commons/nifi-flowfile-packager/pom.xml | 1 -
nifi/nifi-commons/nifi-logging-utils/pom.xml | 1 -
.../nifi-processor-utilities/pom.xml | 1 -
nifi/nifi-commons/nifi-properties/pom.xml | 1 -
nifi/nifi-commons/nifi-security-utils/pom.xml | 1 -
nifi/nifi-commons/nifi-socket-utils/pom.xml | 1 -
nifi/nifi-commons/nifi-web-utils/pom.xml | 1 -
nifi/nifi-commons/nifi-write-ahead-log/pom.xml | 1 -
.../src/main/asciidoc/administration-guide.adoc | 139 +++++++-
nifi/nifi-docs/src/main/asciidoc/images/ncm.png | Bin 0 -> 339522 bytes
nifi/nifi-docs/src/main/asciidoc/overview.adoc | 20 +-
nifi/nifi-mock/pom.xml | 1 -
.../nifi-framework-nar/pom.xml | 1 -
.../nifi-framework/nifi-administration/pom.xml | 1 -
.../nifi/web/api/dto/ControllerServiceDTO.java | 146 +--------
...ontrollerServiceReferencingComponentDTO.java | 160 +++++++++
...ollerServiceReferencingComponentsEntity.java | 46 +++
.../nifi-cluster-protocol/pom.xml | 1 -
.../nifi-file-authorization-provider/pom.xml | 1 -
.../nifi-framework-core-api/pom.xml | 1 -
.../nifi-framework/nifi-security/pom.xml | 1 -
.../nifi-framework/nifi-site-to-site/pom.xml | 1 -
.../nifi-framework/nifi-user-actions/pom.xml | 1 -
.../nifi/audit/ControllerServiceAuditor.java | 74 +++++
.../org/apache/nifi/web/NiFiServiceFacade.java | 19 ++
.../nifi/web/StandardNiFiServiceFacade.java | 19 ++
.../nifi/web/api/ControllerServiceResource.java | 114 +++++++
.../org/apache/nifi/web/api/dto/DtoFactory.java | 70 ++--
.../nifi/web/dao/ControllerServiceDAO.java | 10 +
.../dao/impl/StandardControllerServiceDAO.java | 18 +
.../src/main/webapp/WEB-INF/pages/canvas.jsp | 1 +
.../canvas/controller-service-configuration.jsp | 4 +-
.../disable-controller-service-dialog.jsp | 5 +-
.../canvas/enable-controller-service-dialog.jsp | 40 +++
.../partials/canvas/fill-color-dialog.jsp | 6 +-
.../src/main/webapp/css/controller-service.css | 53 ++-
.../nifi-web-ui/src/main/webapp/css/dialog.css | 8 +-
.../src/main/webapp/css/settings.css | 1 -
.../src/main/webapp/js/nf/canvas/nf-actions.js | 48 ++-
.../webapp/js/nf/canvas/nf-canvas-header.js | 117 ++++---
.../webapp/js/nf/canvas/nf-canvas-toolbar.js | 18 +-
.../webapp/js/nf/canvas/nf-canvas-toolbox.js | 56 ++--
.../main/webapp/js/nf/canvas/nf-canvas-utils.js | 23 +-
.../src/main/webapp/js/nf/canvas/nf-canvas.js | 25 +-
.../main/webapp/js/nf/canvas/nf-connectable.js | 111 ++++---
.../main/webapp/js/nf/canvas/nf-context-menu.js | 7 +-
.../js/nf/canvas/nf-controller-service.js | 325 +++++++++++++++----
.../src/main/webapp/js/nf/canvas/nf-graph.js | 6 +-
.../src/main/webapp/js/nf/canvas/nf-settings.js | 11 +-
.../webapp/js/nf/cluster/nf-cluster-table.js | 8 +-
.../webapp/js/nf/counters/nf-counters-table.js | 2 +-
.../js/nf/provenance/nf-provenance-table.js | 4 +-
.../webapp/js/nf/summary/nf-summary-table.js | 14 +-
.../js/nf/templates/nf-templates-table.js | 6 +-
.../main/webapp/js/nf/users/nf-users-table.js | 2 +-
.../nifi-framework/pom.xml | 1 -
.../nifi-framework-bundle/pom.xml | 1 -
.../nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml | 1 -
.../nifi-nar-bundles/nifi-hadoop-bundle/pom.xml | 1 -
.../nifi-hadoop-libraries-nar/pom.xml | 1 -
.../nifi-hadoop-libraries-bundle/pom.xml | 1 -
nifi/nifi-nar-bundles/nifi-jetty-bundle/pom.xml | 2 -
.../nifi-kafka-bundle/nifi-kafka-nar/pom.xml | 1 -
nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml | 2 +-
.../nifi-provenance-repository-nar/pom.xml | 1 -
.../nifi-provenance-repository-bundle/pom.xml | 1 -
.../nifi-standard-nar/pom.xml | 1 -
.../nifi/processors/standard/EvaluateXPath.java | 2 +-
.../nifi-standard-bundle/pom.xml | 1 -
.../nifi-distributed-cache-services-nar/pom.xml | 1 -
.../pom.xml | 1 -
.../nifi-load-distribution-service-api/pom.xml | 1 -
.../nifi-ssl-context-nar/pom.xml | 1 -
.../nifi-ssl-context-bundle/pom.xml | 1 -
.../nifi-standard-services-api-nar/pom.xml | 1 -
.../nifi-standard-services/pom.xml | 1 -
.../nifi-update-attribute-nar/pom.xml | 2 -
.../nifi-update-attribute-bundle/pom.xml | 3 -
nifi/nifi-nar-bundles/pom.xml | 3 -
85 files changed, 1302 insertions(+), 495 deletions(-)
----------------------------------------------------------------------
[3/5] incubator-nifi git commit: Merge branch 'NIFI-250' of
http://git-wip-us.apache.org/repos/asf/incubator-nifi into NIFI-250
Posted by ma...@apache.org.
Merge branch 'NIFI-250' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into NIFI-250
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/81d84546
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/81d84546
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/81d84546
Branch: refs/heads/NIFI-250
Commit: 81d84546e7b456ae55cefe8892ddfb71a729bac9
Parents: 2ac6432 fac6cd7
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Feb 19 10:38:22 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Feb 19 10:38:22 2015 -0500
----------------------------------------------------------------------
.../web/api/entity/ReportingTaskEntity.java | 45 ++
.../web/api/entity/ReportingTasksEntity.java | 46 ++
.../nifi/audit/ControllerServiceAuditor.java | 2 +
.../org/apache/nifi/web/NiFiServiceFacade.java | 68 ++-
.../nifi/web/StandardNiFiServiceFacade.java | 111 +++-
.../nifi/web/api/ControllerServiceResource.java | 21 +-
.../nifi/web/api/ReportingTaskResource.java | 578 +++++++++++++++++++
.../org/apache/nifi/web/api/dto/DtoFactory.java | 5 +
.../nifi/web/dao/ControllerServiceDAO.java | 7 +-
.../apache/nifi/web/dao/ReportingTaskDAO.java | 88 +++
.../dao/impl/StandardControllerServiceDAO.java | 32 +-
.../web/dao/impl/StandardReportingTaskDAO.java | 230 ++++++++
.../WEB-INF/partials/canvas/canvas-header.jsp | 2 +-
.../disable-controller-service-dialog.jsp | 7 +
.../src/main/webapp/js/nf/canvas/nf-actions.js | 26 +-
.../main/webapp/js/nf/canvas/nf-canvas-utils.js | 4 +
.../js/nf/canvas/nf-controller-service.js | 62 +-
17 files changed, 1291 insertions(+), 43 deletions(-)
----------------------------------------------------------------------