You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/30 19:01:10 UTC

[7/7] incubator-nifi git commit: Load Controller Services from flow.tar file instead and external file

Load Controller Services from flow.tar file instead and external file


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4ae2a10e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4ae2a10e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4ae2a10e

Branch: refs/heads/NIFI-250
Commit: 4ae2a10e6d130353ea34aa403d3bcbad7a2ab4e8
Parents: 52149d8
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 13:01:00 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 13:01:00 2015 -0500

----------------------------------------------------------------------
 .../cluster/manager/impl/WebClusterManager.java | 42 ++++------
 .../spring/WebClusterManagerFactoryBean.java    |  6 --
 .../service/ControllerServiceLoader.java        | 88 ++++++--------------
 .../nifi/persistence/FlowConfigurationDAO.java  | 12 ---
 .../StandardXMLFlowConfigurationDAO.java        | 11 +--
 5 files changed, 41 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 50d7087..d3cf6a1 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -16,13 +16,12 @@
  */
 package org.apache.nifi.cluster.manager.impl;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.net.URI;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -55,7 +54,6 @@ import javax.net.ssl.SSLContext;
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.StreamingOutput;
-import javax.xml.XMLConstants;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
@@ -65,9 +63,6 @@ import javax.xml.transform.TransformerException;
 import javax.xml.transform.TransformerFactory;
 import javax.xml.transform.dom.DOMSource;
 import javax.xml.transform.stream.StreamResult;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import javax.xml.validation.Validator;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.AuditService;
@@ -140,6 +135,7 @@ import org.apache.nifi.controller.reporting.StandardReportingInitializationConte
 import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
 import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
+import org.apache.nifi.controller.service.ControllerServiceLoader;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
@@ -444,14 +440,20 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 }
 
                 // load flow
+                final ClusterDataFlow clusterDataFlow;
                 if (dataFlowManagementService.isFlowCurrent()) {
-                    final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow();
+                    clusterDataFlow = dataFlowManagementService.loadDataFlow();
                     cachedDataFlow = clusterDataFlow.getDataFlow();
                     primaryNodeId = clusterDataFlow.getPrimaryNodeId();
                 } else {
                     throw new IOException("Flow is not current.");
                 }
 
+                final byte[] serializedServices = clusterDataFlow.getControllerServices();
+                if ( serializedServices != null && serializedServices.length > 0 ) {
+                	ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices));
+                }
+                
                 // start multicast broadcasting service, if configured
                 if (servicesBroadcaster != null) {
                     servicesBroadcaster.start();
@@ -461,8 +463,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 executeSafeModeTask();
 
                 // Load and start running Reporting Tasks
-                final File taskFile = new File(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
-                reportingTasks.putAll(loadReportingTasks(taskFile));
+                reportingTasks.putAll(loadReportingTasks(clusterDataFlow.getReportingTasks()));
             } catch (final IOException ioe) {
                 logger.warn("Failed to initialize cluster services due to: " + ioe, ioe);
                 stop();
@@ -876,16 +877,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         reconnectionThread.start();
     }
 
-    private Map<String, ReportingTaskNode> loadReportingTasks(final File taskConfigXml) {
+    private Map<String, ReportingTaskNode> loadReportingTasks(final byte[] serialized) {
         final Map<String, ReportingTaskNode> tasks = new HashMap<>();
-        if (taskConfigXml == null) {
-            logger.info("No controller tasks to start");
-            return tasks;
-        }
 
         try {
-            final URL schemaUrl = getClass().getResource("/ReportingTaskConfiguration.xsd");
-            final Document document = parse(taskConfigXml, schemaUrl);
+            final Document document = parse(serialized);
 
             final NodeList tasksNodes = document.getElementsByTagName("tasks");
             final Element tasksElement = (Element) tasksNodes.item(0);
@@ -957,7 +953,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode);
             }
         } catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) {
-            logger.error("Unable to load reporting tasks from {} due to {}", new Object[]{taskConfigXml, t});
+            logger.error("Unable to load reporting tasks due to {}", new Object[]{t});
             if (logger.isDebugEnabled()) {
                 logger.error("", t);
             }
@@ -999,11 +995,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return taskNode;
     }
 
-    private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException {
-        final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
-        final Schema schema = schemaFactory.newSchema(schemaUrl);
+    private Document parse(final byte[] serialized) throws SAXException, ParserConfigurationException, IOException {
         final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
-        docFactory.setSchema(schema);
         final DocumentBuilder builder = docFactory.newDocumentBuilder();
 
         builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@@ -1036,12 +1029,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         });
 
         // build the docuemnt
-        final Document document = builder.parse(xmlFile);
-
-        // ensure schema compliance
-        final Validator validator = schema.newValidator();
-        validator.validate(new DOMSource(document));
-
+        final Document document = builder.parse(new ByteArrayInputStream(serialized));
         return document;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
index 7169730..3881461 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.cluster.spring;
 
-import java.nio.file.Paths;
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.cluster.event.EventManager;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
@@ -26,7 +25,6 @@ import org.apache.nifi.cluster.manager.HttpResponseMapper;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
 import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
-import org.apache.nifi.controller.service.ControllerServiceLoader;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.io.socket.multicast.DiscoverableService;
 import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
@@ -106,10 +104,6 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
 
             // set the audit service
             clusterManager.setAuditService(applicationContext.getBean("auditService", AuditService.class));
-
-            // load the controller services
-            final ControllerServiceLoader serviceLoader = new ControllerServiceLoader(Paths.get(serviceConfigurationFile));
-            serviceLoader.loadControllerServices(clusterManager);
         }
         return clusterManager;
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 1ca6e54..01114e5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -17,31 +17,21 @@
 package org.apache.nifi.controller.service;
 
 import java.io.BufferedInputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.List;
 
-import javax.xml.XMLConstants;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.nifi.controller.Availability;
 import org.apache.nifi.util.DomUtils;
-import org.apache.nifi.util.file.FileUtils;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
@@ -52,33 +42,14 @@ public class ControllerServiceLoader {
 
     private static final Log logger = LogFactory.getLog(ControllerServiceLoader.class);
 
-    private final Path serviceConfigXmlPath;
 
-    public ControllerServiceLoader(final Path serviceConfigXmlPath) throws IOException {
-        final File serviceConfigXmlFile = serviceConfigXmlPath.toFile();
-        if (!serviceConfigXmlFile.exists() || !serviceConfigXmlFile.canRead()) {
-            throw new IOException(serviceConfigXmlPath + " does not appear to exist or cannot be read. Cannot load configuration.");
-        }
-
-        this.serviceConfigXmlPath = serviceConfigXmlPath;
-    }
-
-    public List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider) throws IOException {
-        final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+    public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream) throws IOException {
         final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
-        InputStream fis = null;
-        BufferedInputStream bis = null;
         documentBuilderFactory.setNamespaceAware(true);
 
         final List<ControllerServiceNode> services = new ArrayList<>();
 
-        try {
-            final URL configurationResource = this.getClass().getResource("/ControllerServiceConfiguration.xsd");
-            if (configurationResource == null) {
-                throw new NullPointerException("Unable to load XML Schema for ControllerServiceConfiguration");
-            }
-            final Schema schema = schemaFactory.newSchema(configurationResource);
-            documentBuilderFactory.setSchema(schema);
+        try (final InputStream in = new BufferedInputStream(serializedStream)) {
             final DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder();
 
             builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@@ -112,42 +83,33 @@ public class ControllerServiceLoader {
             });
 
             //if controllerService.xml does not exist, create an empty file...
-            fis = Files.newInputStream(this.serviceConfigXmlPath, StandardOpenOption.READ);
-            bis = new BufferedInputStream(fis);
-            if (Files.size(this.serviceConfigXmlPath) > 0) {
-                final Document document = builder.parse(bis);
-                final NodeList servicesNodes = document.getElementsByTagName("services");
-                final Element servicesElement = (Element) servicesNodes.item(0);
-
-                final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(servicesElement, "service");
-                for (final Element serviceElement : serviceNodes) {
-                    //get properties for the specific controller task - id, name, class,
-                    //and schedulingPeriod must be set
-                    final String serviceId = DomUtils.getChild(serviceElement, "identifier").getTextContent().trim();
-                    final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
-                    final String availabilityName = DomUtils.getChild(serviceElement, "availability").getTextContent().trim();
-                    
-                    final Availability availability = Availability.valueOf(availabilityName);
-                    
-                    //set the class to be used for the configured controller task
-                    final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, availability, false);
-
-                    //optional task-specific properties
-                    for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
-                        final String name = optionalProperty.getAttribute("name").trim();
-                        final String value = optionalProperty.getTextContent().trim();
-                        serviceNode.setProperty(name, value);
-                    }
-
-                    services.add(serviceNode);
-                    provider.enableControllerService(serviceNode);
+            final Document document = builder.parse(in);
+            final Element controllerServices = DomUtils.getChild(document.getDocumentElement(), "controllerServices");
+            final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
+            for (final Element serviceElement : serviceNodes) {
+                //get properties for the specific controller task - id, name, class,
+                //and schedulingPeriod must be set
+                final String serviceId = DomUtils.getChild(serviceElement, "id").getTextContent().trim();
+                final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
+                final String availabilityName = DomUtils.getChild(serviceElement, "availability").getTextContent().trim();
+                
+                final Availability availability = Availability.valueOf(availabilityName);
+                
+                //set the class to be used for the configured controller task
+                final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, availability, false);
+
+                //optional task-specific properties
+                for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
+                    final String name = optionalProperty.getAttribute("name").trim();
+                    final String value = optionalProperty.getTextContent().trim();
+                    serviceNode.setProperty(name, value);
                 }
+
+                services.add(serviceNode);
+                provider.enableControllerService(serviceNode);
             }
         } catch (SAXException | ParserConfigurationException sxe) {
             throw new IOException(sxe);
-        } finally {
-            FileUtils.closeQuietly(fis);
-            FileUtils.closeQuietly(bis);
         }
 
         return services;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
index 4f3afaf..8cab916 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
@@ -27,7 +27,6 @@ import org.apache.nifi.controller.FlowSerializationException;
 import org.apache.nifi.controller.FlowSynchronizationException;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.UninheritableFlowException;
-import org.apache.nifi.controller.service.ControllerServiceNode;
 
 /**
  * Interface to define service methods for FlowController configuration.
@@ -121,15 +120,4 @@ public interface FlowConfigurationDAO {
      */
     List<ReportingTaskNode> loadReportingTasks(FlowController controller) throws IOException;
 
-    /**
-     * Instantiates all controller services from the file used in the
-     * constructor
-     *
-     * @param controller
-     * @return 
-     * @throws java.io.IOException
-     * @returns all of the ReportingTasks that were instantiated & scheduled
-     */
-    List<ControllerServiceNode> loadControllerServices(FlowController controller) throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
index 039b2c2..3000869 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
@@ -54,10 +54,7 @@ import org.apache.nifi.controller.StandardFlowSynchronizer;
 import org.apache.nifi.controller.UninheritableFlowException;
 import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
-import org.apache.nifi.controller.service.ControllerServiceLoader;
-import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.util.file.FileUtils;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.reporting.ReportingInitializationContext;
@@ -65,7 +62,7 @@ import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.DomUtils;
 import org.apache.nifi.util.NiFiProperties;
-
+import org.apache.nifi.util.file.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.DOMException;
@@ -81,7 +78,6 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
 
     private final Path flowXmlPath;
     private final Path taskConfigXmlPath;
-    private final ControllerServiceLoader servicerLoader;
     private final StringEncryptor encryptor;
 
     private static final Logger LOG = LoggerFactory.getLogger(StandardXMLFlowConfigurationDAO.class);
@@ -103,7 +99,6 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
 
         this.flowXmlPath = flowXml;
         this.taskConfigXmlPath = taskConfigXml;
-        this.servicerLoader = new ControllerServiceLoader(serviceConfigXml);
         this.encryptor = encryptor;
     }
 
@@ -292,10 +287,6 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
         return tasks;
     }
 
-    @Override
-    public List<ControllerServiceNode> loadControllerServices(final FlowController controller) throws IOException {
-        return servicerLoader.loadControllerServices(controller);
-    }
 
     private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException {
         final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);