You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/30 19:01:10 UTC
[7/7] incubator-nifi git commit: Load Controller Services from
flow.tar file instead and external file
Load Controller Services from flow.tar file instead and external file
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4ae2a10e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4ae2a10e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4ae2a10e
Branch: refs/heads/NIFI-250
Commit: 4ae2a10e6d130353ea34aa403d3bcbad7a2ab4e8
Parents: 52149d8
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 13:01:00 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 13:01:00 2015 -0500
----------------------------------------------------------------------
.../cluster/manager/impl/WebClusterManager.java | 42 ++++------
.../spring/WebClusterManagerFactoryBean.java | 6 --
.../service/ControllerServiceLoader.java | 88 ++++++--------------
.../nifi/persistence/FlowConfigurationDAO.java | 12 ---
.../StandardXMLFlowConfigurationDAO.java | 11 +--
5 files changed, 41 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 50d7087..d3cf6a1 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -16,13 +16,12 @@
*/
package org.apache.nifi.cluster.manager.impl;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.URI;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -55,7 +54,6 @@ import javax.net.ssl.SSLContext;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.StreamingOutput;
-import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
@@ -65,9 +63,6 @@ import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import javax.xml.validation.Validator;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
@@ -140,6 +135,7 @@ import org.apache.nifi.controller.reporting.StandardReportingInitializationConte
import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
+import org.apache.nifi.controller.service.ControllerServiceLoader;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
@@ -444,14 +440,20 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
// load flow
+ final ClusterDataFlow clusterDataFlow;
if (dataFlowManagementService.isFlowCurrent()) {
- final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow();
+ clusterDataFlow = dataFlowManagementService.loadDataFlow();
cachedDataFlow = clusterDataFlow.getDataFlow();
primaryNodeId = clusterDataFlow.getPrimaryNodeId();
} else {
throw new IOException("Flow is not current.");
}
+ final byte[] serializedServices = clusterDataFlow.getControllerServices();
+ if ( serializedServices != null && serializedServices.length > 0 ) {
+ ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices));
+ }
+
// start multicast broadcasting service, if configured
if (servicesBroadcaster != null) {
servicesBroadcaster.start();
@@ -461,8 +463,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
executeSafeModeTask();
// Load and start running Reporting Tasks
- final File taskFile = new File(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
- reportingTasks.putAll(loadReportingTasks(taskFile));
+ reportingTasks.putAll(loadReportingTasks(clusterDataFlow.getReportingTasks()));
} catch (final IOException ioe) {
logger.warn("Failed to initialize cluster services due to: " + ioe, ioe);
stop();
@@ -876,16 +877,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
reconnectionThread.start();
}
- private Map<String, ReportingTaskNode> loadReportingTasks(final File taskConfigXml) {
+ private Map<String, ReportingTaskNode> loadReportingTasks(final byte[] serialized) {
final Map<String, ReportingTaskNode> tasks = new HashMap<>();
- if (taskConfigXml == null) {
- logger.info("No controller tasks to start");
- return tasks;
- }
try {
- final URL schemaUrl = getClass().getResource("/ReportingTaskConfiguration.xsd");
- final Document document = parse(taskConfigXml, schemaUrl);
+ final Document document = parse(serialized);
final NodeList tasksNodes = document.getElementsByTagName("tasks");
final Element tasksElement = (Element) tasksNodes.item(0);
@@ -957,7 +953,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode);
}
} catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) {
- logger.error("Unable to load reporting tasks from {} due to {}", new Object[]{taskConfigXml, t});
+ logger.error("Unable to load reporting tasks due to {}", new Object[]{t});
if (logger.isDebugEnabled()) {
logger.error("", t);
}
@@ -999,11 +995,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return taskNode;
}
- private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException {
- final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
- final Schema schema = schemaFactory.newSchema(schemaUrl);
+ private Document parse(final byte[] serialized) throws SAXException, ParserConfigurationException, IOException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
- docFactory.setSchema(schema);
final DocumentBuilder builder = docFactory.newDocumentBuilder();
builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@@ -1036,12 +1029,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
});
// build the docuemnt
- final Document document = builder.parse(xmlFile);
-
- // ensure schema compliance
- final Validator validator = schema.newValidator();
- validator.validate(new DOMSource(document));
-
+ final Document document = builder.parse(new ByteArrayInputStream(serialized));
return document;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
index 7169730..3881461 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.cluster.spring;
-import java.nio.file.Paths;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.cluster.event.EventManager;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
@@ -26,7 +25,6 @@ import org.apache.nifi.cluster.manager.HttpResponseMapper;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
-import org.apache.nifi.controller.service.ControllerServiceLoader;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
@@ -106,10 +104,6 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
// set the audit service
clusterManager.setAuditService(applicationContext.getBean("auditService", AuditService.class));
-
- // load the controller services
- final ControllerServiceLoader serviceLoader = new ControllerServiceLoader(Paths.get(serviceConfigurationFile));
- serviceLoader.loadControllerServices(clusterManager);
}
return clusterManager;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 1ca6e54..01114e5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -17,31 +17,21 @@
package org.apache.nifi.controller.service;
import java.io.BufferedInputStream;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
-import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.nifi.controller.Availability;
import org.apache.nifi.util.DomUtils;
-import org.apache.nifi.util.file.FileUtils;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import org.xml.sax.SAXParseException;
@@ -52,33 +42,14 @@ public class ControllerServiceLoader {
private static final Log logger = LogFactory.getLog(ControllerServiceLoader.class);
- private final Path serviceConfigXmlPath;
- public ControllerServiceLoader(final Path serviceConfigXmlPath) throws IOException {
- final File serviceConfigXmlFile = serviceConfigXmlPath.toFile();
- if (!serviceConfigXmlFile.exists() || !serviceConfigXmlFile.canRead()) {
- throw new IOException(serviceConfigXmlPath + " does not appear to exist or cannot be read. Cannot load configuration.");
- }
-
- this.serviceConfigXmlPath = serviceConfigXmlPath;
- }
-
- public List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider) throws IOException {
- final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream) throws IOException {
final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
- InputStream fis = null;
- BufferedInputStream bis = null;
documentBuilderFactory.setNamespaceAware(true);
final List<ControllerServiceNode> services = new ArrayList<>();
- try {
- final URL configurationResource = this.getClass().getResource("/ControllerServiceConfiguration.xsd");
- if (configurationResource == null) {
- throw new NullPointerException("Unable to load XML Schema for ControllerServiceConfiguration");
- }
- final Schema schema = schemaFactory.newSchema(configurationResource);
- documentBuilderFactory.setSchema(schema);
+ try (final InputStream in = new BufferedInputStream(serializedStream)) {
final DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder();
builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@@ -112,42 +83,33 @@ public class ControllerServiceLoader {
});
//if controllerService.xml does not exist, create an empty file...
- fis = Files.newInputStream(this.serviceConfigXmlPath, StandardOpenOption.READ);
- bis = new BufferedInputStream(fis);
- if (Files.size(this.serviceConfigXmlPath) > 0) {
- final Document document = builder.parse(bis);
- final NodeList servicesNodes = document.getElementsByTagName("services");
- final Element servicesElement = (Element) servicesNodes.item(0);
-
- final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(servicesElement, "service");
- for (final Element serviceElement : serviceNodes) {
- //get properties for the specific controller task - id, name, class,
- //and schedulingPeriod must be set
- final String serviceId = DomUtils.getChild(serviceElement, "identifier").getTextContent().trim();
- final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
- final String availabilityName = DomUtils.getChild(serviceElement, "availability").getTextContent().trim();
-
- final Availability availability = Availability.valueOf(availabilityName);
-
- //set the class to be used for the configured controller task
- final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, availability, false);
-
- //optional task-specific properties
- for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
- final String name = optionalProperty.getAttribute("name").trim();
- final String value = optionalProperty.getTextContent().trim();
- serviceNode.setProperty(name, value);
- }
-
- services.add(serviceNode);
- provider.enableControllerService(serviceNode);
+ final Document document = builder.parse(in);
+ final Element controllerServices = DomUtils.getChild(document.getDocumentElement(), "controllerServices");
+ final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
+ for (final Element serviceElement : serviceNodes) {
+ //get properties for the specific controller task - id, name, class,
+ //and schedulingPeriod must be set
+ final String serviceId = DomUtils.getChild(serviceElement, "id").getTextContent().trim();
+ final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
+ final String availabilityName = DomUtils.getChild(serviceElement, "availability").getTextContent().trim();
+
+ final Availability availability = Availability.valueOf(availabilityName);
+
+ //set the class to be used for the configured controller task
+ final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, availability, false);
+
+ //optional task-specific properties
+ for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
+ final String name = optionalProperty.getAttribute("name").trim();
+ final String value = optionalProperty.getTextContent().trim();
+ serviceNode.setProperty(name, value);
}
+
+ services.add(serviceNode);
+ provider.enableControllerService(serviceNode);
}
} catch (SAXException | ParserConfigurationException sxe) {
throw new IOException(sxe);
- } finally {
- FileUtils.closeQuietly(fis);
- FileUtils.closeQuietly(bis);
}
return services;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
index 4f3afaf..8cab916 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
@@ -27,7 +27,6 @@ import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.FlowSynchronizationException;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.UninheritableFlowException;
-import org.apache.nifi.controller.service.ControllerServiceNode;
/**
* Interface to define service methods for FlowController configuration.
@@ -121,15 +120,4 @@ public interface FlowConfigurationDAO {
*/
List<ReportingTaskNode> loadReportingTasks(FlowController controller) throws IOException;
- /**
- * Instantiates all controller services from the file used in the
- * constructor
- *
- * @param controller
- * @return
- * @throws java.io.IOException
- * @returns all of the ReportingTasks that were instantiated & scheduled
- */
- List<ControllerServiceNode> loadControllerServices(FlowController controller) throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
index 039b2c2..3000869 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
@@ -54,10 +54,7 @@ import org.apache.nifi.controller.StandardFlowSynchronizer;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
-import org.apache.nifi.controller.service.ControllerServiceLoader;
-import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingInitializationContext;
@@ -65,7 +62,7 @@ import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.util.NiFiProperties;
-
+import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.DOMException;
@@ -81,7 +78,6 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
private final Path flowXmlPath;
private final Path taskConfigXmlPath;
- private final ControllerServiceLoader servicerLoader;
private final StringEncryptor encryptor;
private static final Logger LOG = LoggerFactory.getLogger(StandardXMLFlowConfigurationDAO.class);
@@ -103,7 +99,6 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
this.flowXmlPath = flowXml;
this.taskConfigXmlPath = taskConfigXml;
- this.servicerLoader = new ControllerServiceLoader(serviceConfigXml);
this.encryptor = encryptor;
}
@@ -292,10 +287,6 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
return tasks;
}
- @Override
- public List<ControllerServiceNode> loadControllerServices(final FlowController controller) throws IOException {
- return servicerLoader.loadControllerServices(controller);
- }
private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException {
final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);