You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/30 19:01:04 UTC
[1/7] incubator-nifi git commit: NIFI-250: Do not try to load
controller services and reporting tasks from old .xml files because they are
now in flow.xml
Repository: incubator-nifi
Updated Branches:
refs/heads/NIFI-250 e7beef8d2 -> 4ae2a10e6
NIFI-250: Do not try to load controller services and reporting tasks from old .xml files because they are now in flow.xml
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/9a6acab3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/9a6acab3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/9a6acab3
Branch: refs/heads/NIFI-250
Commit: 9a6acab3734fbbb670fb5db2388ac85ef9b6fc6d
Parents: 60ad998
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Jan 29 11:24:56 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Jan 29 11:24:56 2015 -0500
----------------------------------------------------------------------
.../java/org/apache/nifi/controller/StandardFlowService.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a6acab3/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index d459b00..790485c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -608,7 +608,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
if (firstControllerInitialization) {
// load the controller services
logger.debug("Loading controller services");
- dao.loadControllerServices(controller);
+// dao.loadControllerServices(controller);
}
// load the flow
@@ -625,7 +625,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
logger.debug("First controller initialization. Loading reporting tasks and initializing controller.");
// load the controller tasks
- dao.loadReportingTasks(controller);
+// dao.loadReportingTasks(controller);
// initialize the flow
controller.initializeFlow();
[6/7] incubator-nifi git commit: NIFI-250: Change Availability to
just NODE or NCM;
update NCM to store controller services and reporting tasks in separate
entries in tar file instead of in flow.xml
Posted by ma...@apache.org.
NIFI-250: Change Availability to just NODE or NCM; update NCM to store controller services and reporting tasks in separate entries in tar file instead of in flow.xml
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/52149d85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/52149d85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/52149d85
Branch: refs/heads/NIFI-250
Commit: 52149d8510d8c517063bfd6359bde3950862db77
Parents: 3102e08
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 12:44:30 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 12:44:30 2015 -0500
----------------------------------------------------------------------
.../nifi/cluster/flow/ClusterDataFlow.java | 15 +-
.../cluster/flow/DataFlowManagementService.java | 17 ++
.../nifi/cluster/flow/impl/DataFlowDaoImpl.java | 16 +-
.../impl/DataFlowManagementServiceImpl.java | 65 ++++++-
.../cluster/manager/impl/WebClusterManager.java | 175 +++++++++++++++++--
.../reporting/ClusteredReportingTaskNode.java | 3 +-
.../apache/nifi/controller/Availability.java | 12 +-
.../nifi/controller/ReportingTaskNode.java | 2 -
.../service/ControllerServiceNode.java | 2 -
.../service/ControllerServiceProvider.java | 7 +-
.../apache/nifi/controller/FlowController.java | 65 +++----
.../nifi/controller/StandardFlowSerializer.java | 20 +--
.../controller/StandardFlowSynchronizer.java | 6 +-
.../reporting/AbstractReportingTaskNode.java | 12 +-
.../reporting/StandardReportingTaskNode.java | 3 +-
.../service/ControllerServiceLoader.java | 8 +-
.../service/StandardControllerServiceNode.java | 12 +-
.../StandardControllerServiceProvider.java | 9 +-
.../nifi/web/controller/ControllerFacade.java | 9 +-
19 files changed, 337 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
index eedb88f..c17b429 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
@@ -27,14 +27,25 @@ import org.apache.nifi.cluster.protocol.StandardDataFlow;
public class ClusterDataFlow {
private final StandardDataFlow dataFlow;
-
private final NodeIdentifier primaryNodeId;
+ private final byte[] controllerServices;
+ private final byte[] reportingTasks;
- public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId) {
+ public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId, final byte[] controllerServices, final byte[] reportingTasks) {
this.dataFlow = dataFlow;
this.primaryNodeId = primaryNodeId;
+ this.controllerServices = controllerServices;
+ this.reportingTasks = reportingTasks;
}
+ public byte[] getControllerServices() {
+ return controllerServices;
+ }
+
+ public byte[] getReportingTasks() {
+ return reportingTasks;
+ }
+
public NodeIdentifier getPrimaryNodeId() {
return primaryNodeId;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
index 339d904..082d65e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
@@ -17,6 +17,7 @@
package org.apache.nifi.cluster.flow;
import java.util.Set;
+
import org.apache.nifi.cluster.protocol.NodeIdentifier;
/**
@@ -67,6 +68,22 @@ public interface DataFlowManagementService {
void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException;
/**
+ * Updates the dataflow with the given serialized form of the Controller Services that are to exist on the NCM.
+ *
+ * @param serializedControllerServices
+ * @throws DaoException
+ */
+ void updateControllerServices(byte[] serializedControllerServices) throws DaoException;
+
+ /**
+ * Updates the dataflow with the given serialized form of Reporting Tasks that are to exist on the NCM.
+ *
+ * @param serviceNodes
+ * @throws DaoException
+ */
+ void updateReportingTasks(byte[] serializedReportingTasks) throws DaoException;
+
+ /**
* Sets the state of the flow.
*
* @param flowState the state
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
index 72b594a..0d7caf3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
@@ -111,6 +111,8 @@ public class DataFlowDaoImpl implements DataFlowDao {
public static final String FLOW_XML_FILENAME = "flow.xml";
public static final String TEMPLATES_FILENAME = "templates.xml";
public static final String SNIPPETS_FILENAME = "snippets.xml";
+ public static final String CONTROLLER_SERVICES_FILENAME = "controller-services.xml";
+ public static final String REPORTING_TASKS_FILENAME = "reporting-tasks.xml";
public static final String CLUSTER_INFO_FILENAME = "cluster-info.xml";
private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowDaoImpl.class));
@@ -479,7 +481,9 @@ public class DataFlowDaoImpl implements DataFlowDao {
byte[] templateBytes = new byte[0];
byte[] snippetBytes = new byte[0];
byte[] clusterInfoBytes = new byte[0];
-
+ byte[] controllerServiceBytes = new byte[0];
+ byte[] reportingTaskBytes = new byte[0];
+
try (final InputStream inStream = new FileInputStream(file);
final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) {
TarArchiveEntry tarEntry;
@@ -501,6 +505,14 @@ public class DataFlowDaoImpl implements DataFlowDao {
clusterInfoBytes = new byte[(int) tarEntry.getSize()];
StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true);
break;
+ case CONTROLLER_SERVICES_FILENAME:
+ controllerServiceBytes = new byte[(int) tarEntry.getSize()];
+ StreamUtils.fillBuffer(tarIn, controllerServiceBytes, true);
+ break;
+ case REPORTING_TASKS_FILENAME:
+ reportingTaskBytes = new byte[(int) tarEntry.getSize()];
+ StreamUtils.fillBuffer(tarIn, reportingTaskBytes, true);
+ break;
default:
throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName());
}
@@ -518,7 +530,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
dataFlow.setAutoStartProcessors(autoStart);
- return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId());
+ return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, reportingTaskBytes);
}
private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
index e135af3..1bb8ca3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
@@ -41,7 +41,6 @@ import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.util.FormatUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,17 +153,74 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
final StandardDataFlow dataFlow;
+ final byte[] controllerServiceBytes;
+ final byte[] reportingTaskBytes;
if (existingClusterDataFlow == null) {
dataFlow = null;
+ controllerServiceBytes = new byte[0];
+ reportingTaskBytes = new byte[0];
} else {
dataFlow = existingClusterDataFlow.getDataFlow();
+ controllerServiceBytes = existingClusterDataFlow.getControllerServices();
+ reportingTaskBytes = existingClusterDataFlow.getReportingTasks();
}
- flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId));
+ flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
} finally {
resourceLock.unlock("updatePrimaryNode");
}
}
+
+
+ @Override
+ public void updateControllerServices(final byte[] controllerServiceBytes) throws DaoException {
+ resourceLock.lock();
+ try {
+ final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+
+ final StandardDataFlow dataFlow;
+ final byte[] reportingTaskBytes;
+ final NodeIdentifier nodeId;
+ if (existingClusterDataFlow == null) {
+ dataFlow = null;
+ nodeId = null;
+ reportingTaskBytes = new byte[0];
+ } else {
+ dataFlow = existingClusterDataFlow.getDataFlow();
+ nodeId = existingClusterDataFlow.getPrimaryNodeId();
+ reportingTaskBytes = existingClusterDataFlow.getReportingTasks();
+ }
+
+ flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
+ } finally {
+ resourceLock.unlock("updateControllerServices");
+ }
+ }
+
+ @Override
+ public void updateReportingTasks(final byte[] reportingTaskBytes) throws DaoException {
+ resourceLock.lock();
+ try {
+ final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+
+ final StandardDataFlow dataFlow;
+ final byte[] controllerServiceBytes;
+ final NodeIdentifier nodeId;
+ if (existingClusterDataFlow == null) {
+ dataFlow = null;
+ nodeId = null;
+ controllerServiceBytes = null;
+ } else {
+ dataFlow = existingClusterDataFlow.getDataFlow();
+ nodeId = existingClusterDataFlow.getPrimaryNodeId();
+ controllerServiceBytes = existingClusterDataFlow.getControllerServices();
+ }
+
+ flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
+ } finally {
+ resourceLock.unlock("updateControllerServices");
+ }
+ }
@Override
public PersistedFlowState getPersistedFlowState() {
@@ -303,9 +359,10 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
final ClusterDataFlow currentClusterDataFlow;
if (existingClusterDataFlow == null) {
- currentClusterDataFlow = new ClusterDataFlow(dataFlow, null);
+ currentClusterDataFlow = new ClusterDataFlow(dataFlow, null, new byte[0], new byte[0]);
} else {
- currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId());
+ currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId(),
+ existingClusterDataFlow.getControllerServices(), existingClusterDataFlow.getReportingTasks());
}
flowDao.saveDataFlow(currentClusterDataFlow);
flowDao.setPersistedFlowState(PersistedFlowState.CURRENT);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 9649fee..50d7087 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.cluster.manager.impl;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
@@ -58,13 +59,19 @@ import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import javax.xml.validation.Validator;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.cluster.BulletinsPayload;
import org.apache.nifi.cluster.ClusterNodeInformation;
import org.apache.nifi.cluster.HeartbeatPayload;
@@ -125,6 +132,7 @@ import org.apache.nifi.controller.Availability;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.StandardFlowSerializer;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
@@ -170,6 +178,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
@@ -305,7 +314,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private final ClusterManagerLock writeLock = new ClusterManagerLock(resourceRWLock.writeLock(), "Write");
private final Set<Node> nodes = new HashSet<>();
- private final Set<ReportingTaskNode> reportingTasks = new HashSet<>();
+ private final Map<String, ReportingTaskNode> reportingTasks = new HashMap<>();
// null means the dataflow should be read from disk
private StandardDataFlow cachedDataFlow = null;
@@ -453,7 +462,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// Load and start running Reporting Tasks
final File taskFile = new File(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
- reportingTasks.addAll(loadReportingTasks(taskFile));
+ reportingTasks.putAll(loadReportingTasks(taskFile));
} catch (final IOException ioe) {
logger.warn("Failed to initialize cluster services due to: " + ioe, ioe);
stop();
@@ -867,8 +876,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
reconnectionThread.start();
}
- private List<ReportingTaskNode> loadReportingTasks(final File taskConfigXml) {
- final List<ReportingTaskNode> tasks = new ArrayList<>();
+ private Map<String, ReportingTaskNode> loadReportingTasks(final File taskConfigXml) {
+ final Map<String, ReportingTaskNode> tasks = new HashMap<>();
if (taskConfigXml == null) {
logger.info("No controller tasks to start");
return tasks;
@@ -945,7 +954,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
processScheduler.schedule(reportingTaskNode);
- tasks.add(reportingTaskNode);
+ tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode);
}
} catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) {
logger.error("Unable to load reporting tasks from {} due to {}", new Object[]{taskConfigXml, t});
@@ -1295,8 +1304,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
@Override
- public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
- return controllerServiceProvider.createControllerService(type, firstTimeAdded);
+ public ControllerServiceNode createControllerService(final String type, final Availability availability, final boolean firstTimeAdded) {
+ if ( availability == null ) {
+ throw new NullPointerException("availability is null");
+ }
+ if ( availability == Availability.NODE ) {
+ throw new IllegalArgumentException("Cannot create Controller Service with Availability 'NODE' on the Cluster Manager");
+ }
+
+ return controllerServiceProvider.createControllerService(type, availability, firstTimeAdded);
}
/**
@@ -1308,8 +1324,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
* @return
*/
@Override
- public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
- return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+ public ControllerServiceNode createControllerService(final String type, final String id, final Availability availability, final boolean firstTimeAdded) {
+ if ( availability == null ) {
+ throw new NullPointerException("availability is null");
+ }
+ if ( availability == Availability.NODE ) {
+ throw new IllegalArgumentException("Cannot create Controller Service with Availability 'NODE' on the Cluster Manager");
+ }
+
+ return controllerServiceProvider.createControllerService(type, id, availability, firstTimeAdded);
}
@Override
@@ -1345,21 +1368,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
@Override
public void enableControllerService(final ControllerServiceNode serviceNode) {
- if ( serviceNode.getAvailability() == Availability.NODE_ONLY ) {
- serviceNode.setDisabled(false); // update disabled flag to stay in sync across cluster
- return;
- }
-
controllerServiceProvider.enableControllerService(serviceNode);
}
@Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
- if ( serviceNode.getAvailability() == Availability.NODE_ONLY ) {
- serviceNode.setDisabled(true); // update disabled flag to stay in sync across cluster
- return;
- }
-
controllerServiceProvider.disableControllerService(serviceNode);
}
@@ -1368,6 +1381,130 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return controllerServiceProvider.getAllControllerServices();
}
+
+ private byte[] serialize(final Document doc) throws TransformerException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DOMSource domSource = new DOMSource(doc);
+ final StreamResult streamResult = new StreamResult(baos);
+
+ // configure the transformer and convert the DOM
+ final TransformerFactory transformFactory = TransformerFactory.newInstance();
+ final Transformer transformer = transformFactory.newTransformer();
+ transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+
+ // transform the document to byte stream
+ transformer.transform(domSource, streamResult);
+ return baos.toByteArray();
+ }
+
+ private byte[] serializeControllerServices() throws ParserConfigurationException, TransformerException {
+ final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+ final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+ final Document document = docBuilder.newDocument();
+ final Element rootElement = document.createElement("controllerServices");
+
+ for ( final ControllerServiceNode serviceNode : getAllControllerServices() ) {
+ StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor);
+ }
+
+ return serialize(document);
+ }
+
+ private byte[] serializeReportingTasks() throws ParserConfigurationException, TransformerException {
+ final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+ final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+ final Document document = docBuilder.newDocument();
+ final Element rootElement = document.createElement("controllerServices");
+
+ for ( final ReportingTaskNode taskNode : getReportingTasks() ) {
+ StandardFlowSerializer.addReportingTask(rootElement, taskNode, encryptor);
+ }
+
+ return serialize(document);
+ }
+
+
+ public void saveControllerServices() {
+ try {
+ dataFlowManagementService.updateControllerServices(serializeControllerServices());
+ } catch (final Exception e) {
+ logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+
+ getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(),
+ "Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details."));
+ }
+ }
+
+ public void saveReportingTasks() {
+ try {
+ dataFlowManagementService.updateReportingTasks(serializeReportingTasks());
+ } catch (final Exception e) {
+ logger.error("Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart due to " + e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+
+ getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Reporting Tasks", Severity.ERROR.name(),
+ "Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart. See logs for more details."));
+ }
+ }
+
+
+ public Set<ReportingTaskNode> getReportingTasks() {
+ readLock.lock();
+ try {
+ return new HashSet<>(reportingTasks.values());
+ } finally {
+ readLock.unlock("getReportingTasks");
+ }
+ }
+
+
+ public ReportingTaskNode getReportingTaskNode(final String taskId) {
+ readLock.lock();
+ try {
+ return reportingTasks.get(taskId);
+ } finally {
+ readLock.unlock("getReportingTaskNode");
+ }
+ }
+
+ public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
+ reportingTaskNode.verifyCanStart();
+ processScheduler.schedule(reportingTaskNode);
+ }
+
+
+ public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
+ reportingTaskNode.verifyCanStop();
+ processScheduler.unschedule(reportingTaskNode);
+ }
+
+ public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
+ writeLock.lock();
+ try {
+ final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
+ if ( existing == null || existing != reportingTaskNode ) {
+ throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
+ }
+
+ reportingTaskNode.verifyCanDelete();
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
+ }
+
+ reportingTasks.remove(reportingTaskNode.getIdentifier());
+ } finally {
+ writeLock.unlock("removeReportingTask");
+ }
+ }
+
+
/**
* Handle a bulletins message.
*
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
index 1ed5b30..5477d8e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller.reporting;
import org.apache.nifi.cluster.manager.impl.ClusteredReportingContext;
+import org.apache.nifi.controller.Availability;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -34,7 +35,7 @@ public class ClusteredReportingTaskNode extends AbstractReportingTaskNode {
public ClusteredReportingTaskNode(final ReportingTask reportingTask, final String id, final ProcessScheduler scheduler,
final EventAccess eventAccess, final BulletinRepository bulletinRepository, final ControllerServiceProvider serviceProvider,
final ValidationContextFactory validationContextFactory) {
- super(reportingTask, id, serviceProvider, scheduler, validationContextFactory);
+ super(reportingTask, id, Availability.NCM, serviceProvider, scheduler, validationContextFactory);
this.eventAccess = eventAccess;
this.bulletinRepository = bulletinRepository;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
index 38df6f7..44987e2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
@@ -18,7 +18,13 @@ package org.apache.nifi.controller;
public enum Availability {
- CLUSTER_MANAGER_ONLY,
- NODE_ONLY,
- BOTH;
+ /**
+ * Service or reporting task will run only on the NiFi Cluster Manager (NCM)
+ */
+ NCM,
+
+ /**
+ * Service or reporting task will run only on NiFi Nodes (or standalone instance, if not clustered)
+ */
+ NODE;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index c4960a9..b26bf82 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -26,8 +26,6 @@ public interface ReportingTaskNode extends ConfiguredComponent {
Availability getAvailability();
- void setAvailability(Availability availability);
-
void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
SchedulingStrategy getSchedulingStrategy();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index bf6b888..298a224 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -28,8 +28,6 @@ public interface ControllerServiceNode extends ConfiguredComponent {
Availability getAvailability();
- void setAvailability(Availability availability);
-
boolean isDisabled();
void setDisabled(boolean disabled);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 42d6609..a99452e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.service;
import java.util.Set;
import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.controller.Availability;
import org.apache.nifi.controller.ControllerServiceLookup;
/**
@@ -31,10 +32,11 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
* is true, calls any methods that are annotated with {@link OnAdded}
*
* @param type
+ * @param availabilty
* @param firstTimeAdded
* @return
*/
- ControllerServiceNode createControllerService(String type, boolean firstTimeAdded);
+ ControllerServiceNode createControllerService(String type, Availability availabilty, boolean firstTimeAdded);
/**
* Creates a new Controller Service of the specified type and assigns it the given id. If <code>firstTimeadded</code>
@@ -42,10 +44,11 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
*
* @param type
* @param id
+ * @param availabilty
* @param firstTimeAdded
* @return
*/
- ControllerServiceNode createControllerService(String type, String id, boolean firstTimeAdded);
+ ControllerServiceNode createControllerService(String type, String id, Availability availabilty, boolean firstTimeAdded);
/**
* Gets the controller service node for the specified identifier. Returns
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 347d6b9..c9c41f2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2465,14 +2465,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
}
public ReportingTaskNode createReportingTask(final String type) throws ReportingTaskInstantiationException {
- return createReportingTask(type, true);
+ return createReportingTask(type, Availability.NODE, true);
}
- public ReportingTaskNode createReportingTask(final String type, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
- return createReportingTask(type, UUID.randomUUID().toString(), firstTimeAdded);
+ public ReportingTaskNode createReportingTask(final String type, final Availability availability, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
+ return createReportingTask(type, UUID.randomUUID().toString(), availability, firstTimeAdded);
}
- public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
+ public ReportingTaskNode createReportingTask(final String type, final String id, final Availability availability, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
if (type == null || id == null) {
throw new NullPointerException();
}
@@ -2520,36 +2520,21 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
}
public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
- if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
- reportingTaskNode.setScheduledState(ScheduledState.RUNNING); // updated scheduled state to keep state in sync across cluster
- return;
- }
-
if (isTerminated()) {
throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode + " because the controller is terminated");
}
- if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
- return;
- }
-
reportingTaskNode.verifyCanStart();
processScheduler.schedule(reportingTaskNode);
}
public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
- if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
- reportingTaskNode.setScheduledState(ScheduledState.STOPPED); // updated scheduled state to keep state in sync across cluster
- return;
- }
-
if (isTerminated()) {
return;
}
reportingTaskNode.verifyCanStop();
-
processScheduler.unschedule(reportingTaskNode);
}
@@ -2650,53 +2635,47 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
}
}
- public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
- return controllerServiceProvider.createControllerService(type, firstTimeAdded);
+ public ControllerServiceNode createControllerService(final String type, final Availability availability, final boolean firstTimeAdded) {
+ if ( availability == null ) {
+ throw new NullPointerException("availability is null");
+ }
+ if ( availability == Availability.NCM ) {
+ throw new IllegalArgumentException("Cannot create Controller Service with Availability 'NCM' on a standalone instance or a Node");
+ }
+
+ return controllerServiceProvider.createControllerService(type, availability, firstTimeAdded);
}
@Override
- public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
- return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+ public ControllerServiceNode createControllerService(final String type, final String id, final Availability availability, final boolean firstTimeAdded) {
+ if ( availability == null ) {
+ throw new NullPointerException("availability is null");
+ }
+ if ( availability == Availability.NCM ) {
+ throw new IllegalArgumentException("Cannot create Controller Service with Availability 'NCM' on a standalone instance or a Node");
+ }
+
+ return controllerServiceProvider.createControllerService(type, id, availability, firstTimeAdded);
}
public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
- if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
- reportingTaskNode.setScheduledState(ScheduledState.STOPPED); // updated scheduled state to keep state in sync across cluster
- return;
- }
-
reportingTaskNode.verifyCanEnable();
processScheduler.enableReportingTask(reportingTaskNode);
}
public void disableReportingTask(final ReportingTaskNode reportingTaskNode) {
- if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
- reportingTaskNode.setScheduledState(ScheduledState.DISABLED); // updated scheduled state to keep state in sync across cluster
- return;
- }
-
reportingTaskNode.verifyCanDisable();
processScheduler.disableReportingTask(reportingTaskNode);
}
@Override
public void enableControllerService(final ControllerServiceNode serviceNode) {
- if ( serviceNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
- serviceNode.setDisabled(false); // set the disabled flag so that we can keep in sync with cluster
- return;
- }
-
serviceNode.verifyCanEnable();
controllerServiceProvider.enableControllerService(serviceNode);
}
@Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
- if ( serviceNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) {
- serviceNode.setDisabled(true); // set the disabled flag so that we can keep in sync with cluster
- return;
- }
-
serviceNode.verifyCanDisable();
controllerServiceProvider.disableControllerService(serviceNode);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
index 1f483ed..18e2d8c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
@@ -83,13 +83,13 @@ public class StandardFlowSerializer implements FlowSerializer {
final Element controllerServicesNode = doc.createElement("controllerServices");
rootNode.appendChild(controllerServicesNode);
for ( final ControllerServiceNode serviceNode : controller.getAllControllerServices() ) {
- addControllerService(controllerServicesNode, serviceNode);
+ addControllerService(controllerServicesNode, serviceNode, encryptor);
}
final Element reportingTasksNode = doc.createElement("reportingTasks");
rootNode.appendChild(reportingTasksNode);
for ( final ReportingTaskNode taskNode : controller.getReportingTasks() ) {
- addReportingTask(reportingTasksNode, taskNode);
+ addReportingTask(reportingTasksNode, taskNode, encryptor);
}
final DOMSource domSource = new DOMSource(doc);
@@ -312,14 +312,14 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
- addConfiguration(element, processor.getProperties(), processor.getAnnotationData());
+ addConfiguration(element, processor.getProperties(), processor.getAnnotationData(), encryptor);
for (final Relationship rel : processor.getAutoTerminatedRelationships()) {
addTextElement(element, "autoTerminatedRelationship", rel.getName());
}
}
- private void addConfiguration(final Element element, final Map<PropertyDescriptor, String> properties, final String annotationData) {
+ private static void addConfiguration(final Element element, final Map<PropertyDescriptor, String> properties, final String annotationData, final StringEncryptor encryptor) {
final Document doc = element.getOwnerDocument();
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
@@ -406,7 +406,7 @@ public class StandardFlowSerializer implements FlowSerializer {
}
- private void addControllerService(final Element element, final ControllerServiceNode serviceNode) {
+ public static void addControllerService(final Element element, final ControllerServiceNode serviceNode, final StringEncryptor encryptor) {
final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
addTextElement(serviceElement, "id", serviceNode.getIdentifier());
addTextElement(serviceElement, "name", serviceNode.getName());
@@ -415,12 +415,12 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(serviceElement, "enabled", String.valueOf(!serviceNode.isDisabled()));
addTextElement(serviceElement, "availability", serviceNode.getAvailability().toString());
- addConfiguration(serviceElement, serviceNode.getProperties(), serviceNode.getAnnotationData());
+ addConfiguration(serviceElement, serviceNode.getProperties(), serviceNode.getAnnotationData(), encryptor);
element.appendChild(serviceElement);
}
- private void addReportingTask(final Element element, final ReportingTaskNode taskNode) {
+ public static void addReportingTask(final Element element, final ReportingTaskNode taskNode, final StringEncryptor encryptor) {
final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
addTextElement(taskElement, "id", taskNode.getIdentifier());
addTextElement(taskElement, "name", taskNode.getName());
@@ -431,16 +431,16 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(taskElement, "schedulingStrategy", taskNode.getSchedulingStrategy().name());
addTextElement(taskElement, "availability", taskNode.getAvailability().toString());
- addConfiguration(taskElement, taskNode.getProperties(), taskNode.getAnnotationData());
+ addConfiguration(taskElement, taskNode.getProperties(), taskNode.getAnnotationData(), encryptor);
element.appendChild(taskElement);
}
- private void addTextElement(final Element element, final String name, final long value) {
+ private static void addTextElement(final Element element, final String name, final long value) {
addTextElement(element, name, String.valueOf(value));
}
- private void addTextElement(final Element element, final String name, final String value) {
+ private static void addTextElement(final Element element, final String name, final String value) {
final Document doc = element.getOwnerDocument();
final Element toAdd = doc.createElement(name);
toAdd.setTextContent(value);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 0619793..f9279a9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -347,9 +347,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
private void addControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) {
final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
- final ControllerServiceNode node = controller.createControllerService(dto.getType(), dto.getId(), false);
+ final ControllerServiceNode node = controller.createControllerService(dto.getType(), dto.getId(), Availability.valueOf(dto.getAvailability().toUpperCase()), false);
node.setName(dto.getName());
- node.setAvailability(Availability.valueOf(dto.getAvailability()));
node.setComments(dto.getComments());
node.setAnnotationData(dto.getAnnotationData());
@@ -392,10 +391,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
private void addReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) throws ReportingTaskInstantiationException {
final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
- final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false);
+ final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), Availability.valueOf(dto.getAvailability().toUpperCase()), false);
reportingTask.setName(dto.getName());
reportingTask.setComments(dto.getComment());
- reportingTask.setAvailability(Availability.valueOf(dto.getAvailability()));
reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 8a131d0..ed72618 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -42,31 +42,27 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
private final ReportingTask reportingTask;
private final ProcessScheduler processScheduler;
private final ControllerServiceLookup serviceLookup;
+ private final Availability availability;
private final AtomicReference<SchedulingStrategy> schedulingStrategy = new AtomicReference<>(SchedulingStrategy.TIMER_DRIVEN);
private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins");
- private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
private volatile String comment;
private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
- public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
+ public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, final Availability availability,
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
final ValidationContextFactory validationContextFactory) {
super(reportingTask, id, validationContextFactory, controllerServiceProvider);
this.reportingTask = reportingTask;
this.processScheduler = processScheduler;
this.serviceLookup = controllerServiceProvider;
+ this.availability = availability;
}
@Override
public Availability getAvailability() {
- return availability.get();
- }
-
- @Override
- public void setAvailability(final Availability availability) {
- this.availability.set(availability);
+ return availability;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
index fe3af92..dd83c61 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller.reporting;
+import org.apache.nifi.controller.Availability;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ReportingTaskNode;
@@ -29,7 +30,7 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
public StandardReportingTaskNode(final ReportingTask reportingTask, final String id, final FlowController controller,
final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory) {
- super(reportingTask, id, controller, processScheduler, validationContextFactory);
+ super(reportingTask, id, Availability.NODE, controller, processScheduler, validationContextFactory);
this.flowController = controller;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index db44b5f..1ca6e54 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -36,6 +36,7 @@ import javax.xml.validation.SchemaFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.nifi.controller.Availability;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.util.file.FileUtils;
import org.w3c.dom.Document;
@@ -124,9 +125,12 @@ public class ControllerServiceLoader {
//and schedulingPeriod must be set
final String serviceId = DomUtils.getChild(serviceElement, "identifier").getTextContent().trim();
final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
-
+ final String availabilityName = DomUtils.getChild(serviceElement, "availability").getTextContent().trim();
+
+ final Availability availability = Availability.valueOf(availabilityName);
+
//set the class to be used for the configured controller task
- final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, false);
+ final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, availability, false);
//optional task-specific properties
for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index dd06f40..be0e7d6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -19,7 +19,6 @@ package org.apache.nifi.controller.service;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -40,8 +39,8 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
private final ControllerService proxedControllerService;
private final ControllerService implementation;
private final ControllerServiceProvider serviceProvider;
+ private final Availability availability;
- private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
private final AtomicBoolean disabled = new AtomicBoolean(true);
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -52,11 +51,12 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
private String comment;
public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
- final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
+ final Availability availability, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
super(proxiedControllerService, id, validationContextFactory, serviceProvider);
this.proxedControllerService = proxiedControllerService;
this.implementation = implementation;
this.serviceProvider = serviceProvider;
+ this.availability = availability;
}
@Override
@@ -83,13 +83,9 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public Availability getAvailability() {
- return availability.get();
+ return availability;
}
- @Override
- public void setAvailability(final Availability availability) {
- this.availability.set(availability);
- }
@Override
public ControllerService getProxiedControllerService() {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 3ec4755..fd97da7 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -36,6 +36,7 @@ import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.controller.Availability;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ValidationContextFactory;
@@ -98,12 +99,12 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
@Override
- public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
- return createControllerService(type, UUID.randomUUID().toString(), firstTimeAdded);
+ public ControllerServiceNode createControllerService(final String type, final Availability availability, final boolean firstTimeAdded) {
+ return createControllerService(type, UUID.randomUUID().toString(), availability, firstTimeAdded);
}
@Override
- public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
+ public ControllerServiceNode createControllerService(final String type, final String id, final Availability availability, final boolean firstTimeAdded) {
if (type == null || id == null) {
throw new NullPointerException();
}
@@ -147,7 +148,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
- final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this);
+ final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, availability, validationContextFactory, this);
serviceNodeHolder.set(serviceNode);
serviceNode.setName(rawClass.getSimpleName());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/52149d85/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 40d3b1b..f8852d9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -43,6 +43,7 @@ import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.Availability;
import org.apache.nifi.controller.ContentAvailability;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Counter;
@@ -404,13 +405,13 @@ public class ControllerFacade implements ControllerServiceProvider {
return flowController.getControllerService(serviceIdentifier);
}
- public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
- return flowController.createControllerService(type, firstTimeAdded);
+ public ControllerServiceNode createControllerService(final String type, final Availability availability, final boolean firstTimeAdded) {
+ return flowController.createControllerService(type, availability, firstTimeAdded);
}
@Override
- public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
- return flowController.createControllerService(type, id, firstTimeAdded);
+ public ControllerServiceNode createControllerService(final String type, final String id, final Availability availability, final boolean firstTimeAdded) {
+ return flowController.createControllerService(type, id, availability, firstTimeAdded);
}
public void removeControllerService(ControllerServiceNode serviceNode) {
[7/7] incubator-nifi git commit: Load Controller Services from
flow.tar file instead and external file
Posted by ma...@apache.org.
Load Controller Services from flow.tar file instead and external file
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4ae2a10e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4ae2a10e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4ae2a10e
Branch: refs/heads/NIFI-250
Commit: 4ae2a10e6d130353ea34aa403d3bcbad7a2ab4e8
Parents: 52149d8
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 13:01:00 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 13:01:00 2015 -0500
----------------------------------------------------------------------
.../cluster/manager/impl/WebClusterManager.java | 42 ++++------
.../spring/WebClusterManagerFactoryBean.java | 6 --
.../service/ControllerServiceLoader.java | 88 ++++++--------------
.../nifi/persistence/FlowConfigurationDAO.java | 12 ---
.../StandardXMLFlowConfigurationDAO.java | 11 +--
5 files changed, 41 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 50d7087..d3cf6a1 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -16,13 +16,12 @@
*/
package org.apache.nifi.cluster.manager.impl;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.URI;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -55,7 +54,6 @@ import javax.net.ssl.SSLContext;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.StreamingOutput;
-import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
@@ -65,9 +63,6 @@ import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import javax.xml.validation.Validator;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
@@ -140,6 +135,7 @@ import org.apache.nifi.controller.reporting.StandardReportingInitializationConte
import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
+import org.apache.nifi.controller.service.ControllerServiceLoader;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
@@ -444,14 +440,20 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
// load flow
+ final ClusterDataFlow clusterDataFlow;
if (dataFlowManagementService.isFlowCurrent()) {
- final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow();
+ clusterDataFlow = dataFlowManagementService.loadDataFlow();
cachedDataFlow = clusterDataFlow.getDataFlow();
primaryNodeId = clusterDataFlow.getPrimaryNodeId();
} else {
throw new IOException("Flow is not current.");
}
+ final byte[] serializedServices = clusterDataFlow.getControllerServices();
+ if ( serializedServices != null && serializedServices.length > 0 ) {
+ ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices));
+ }
+
// start multicast broadcasting service, if configured
if (servicesBroadcaster != null) {
servicesBroadcaster.start();
@@ -461,8 +463,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
executeSafeModeTask();
// Load and start running Reporting Tasks
- final File taskFile = new File(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
- reportingTasks.putAll(loadReportingTasks(taskFile));
+ reportingTasks.putAll(loadReportingTasks(clusterDataFlow.getReportingTasks()));
} catch (final IOException ioe) {
logger.warn("Failed to initialize cluster services due to: " + ioe, ioe);
stop();
@@ -876,16 +877,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
reconnectionThread.start();
}
- private Map<String, ReportingTaskNode> loadReportingTasks(final File taskConfigXml) {
+ private Map<String, ReportingTaskNode> loadReportingTasks(final byte[] serialized) {
final Map<String, ReportingTaskNode> tasks = new HashMap<>();
- if (taskConfigXml == null) {
- logger.info("No controller tasks to start");
- return tasks;
- }
try {
- final URL schemaUrl = getClass().getResource("/ReportingTaskConfiguration.xsd");
- final Document document = parse(taskConfigXml, schemaUrl);
+ final Document document = parse(serialized);
final NodeList tasksNodes = document.getElementsByTagName("tasks");
final Element tasksElement = (Element) tasksNodes.item(0);
@@ -957,7 +953,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode);
}
} catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) {
- logger.error("Unable to load reporting tasks from {} due to {}", new Object[]{taskConfigXml, t});
+ logger.error("Unable to load reporting tasks due to {}", new Object[]{t});
if (logger.isDebugEnabled()) {
logger.error("", t);
}
@@ -999,11 +995,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return taskNode;
}
- private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException {
- final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
- final Schema schema = schemaFactory.newSchema(schemaUrl);
+ private Document parse(final byte[] serialized) throws SAXException, ParserConfigurationException, IOException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
- docFactory.setSchema(schema);
final DocumentBuilder builder = docFactory.newDocumentBuilder();
builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@@ -1036,12 +1029,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
});
// build the docuemnt
- final Document document = builder.parse(xmlFile);
-
- // ensure schema compliance
- final Validator validator = schema.newValidator();
- validator.validate(new DOMSource(document));
-
+ final Document document = builder.parse(new ByteArrayInputStream(serialized));
return document;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
index 7169730..3881461 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.cluster.spring;
-import java.nio.file.Paths;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.cluster.event.EventManager;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
@@ -26,7 +25,6 @@ import org.apache.nifi.cluster.manager.HttpResponseMapper;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
-import org.apache.nifi.controller.service.ControllerServiceLoader;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
@@ -106,10 +104,6 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
// set the audit service
clusterManager.setAuditService(applicationContext.getBean("auditService", AuditService.class));
-
- // load the controller services
- final ControllerServiceLoader serviceLoader = new ControllerServiceLoader(Paths.get(serviceConfigurationFile));
- serviceLoader.loadControllerServices(clusterManager);
}
return clusterManager;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 1ca6e54..01114e5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -17,31 +17,21 @@
package org.apache.nifi.controller.service;
import java.io.BufferedInputStream;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
-import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.nifi.controller.Availability;
import org.apache.nifi.util.DomUtils;
-import org.apache.nifi.util.file.FileUtils;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import org.xml.sax.SAXParseException;
@@ -52,33 +42,14 @@ public class ControllerServiceLoader {
private static final Log logger = LogFactory.getLog(ControllerServiceLoader.class);
- private final Path serviceConfigXmlPath;
- public ControllerServiceLoader(final Path serviceConfigXmlPath) throws IOException {
- final File serviceConfigXmlFile = serviceConfigXmlPath.toFile();
- if (!serviceConfigXmlFile.exists() || !serviceConfigXmlFile.canRead()) {
- throw new IOException(serviceConfigXmlPath + " does not appear to exist or cannot be read. Cannot load configuration.");
- }
-
- this.serviceConfigXmlPath = serviceConfigXmlPath;
- }
-
- public List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider) throws IOException {
- final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream) throws IOException {
final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
- InputStream fis = null;
- BufferedInputStream bis = null;
documentBuilderFactory.setNamespaceAware(true);
final List<ControllerServiceNode> services = new ArrayList<>();
- try {
- final URL configurationResource = this.getClass().getResource("/ControllerServiceConfiguration.xsd");
- if (configurationResource == null) {
- throw new NullPointerException("Unable to load XML Schema for ControllerServiceConfiguration");
- }
- final Schema schema = schemaFactory.newSchema(configurationResource);
- documentBuilderFactory.setSchema(schema);
+ try (final InputStream in = new BufferedInputStream(serializedStream)) {
final DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder();
builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@@ -112,42 +83,33 @@ public class ControllerServiceLoader {
});
//if controllerService.xml does not exist, create an empty file...
- fis = Files.newInputStream(this.serviceConfigXmlPath, StandardOpenOption.READ);
- bis = new BufferedInputStream(fis);
- if (Files.size(this.serviceConfigXmlPath) > 0) {
- final Document document = builder.parse(bis);
- final NodeList servicesNodes = document.getElementsByTagName("services");
- final Element servicesElement = (Element) servicesNodes.item(0);
-
- final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(servicesElement, "service");
- for (final Element serviceElement : serviceNodes) {
- //get properties for the specific controller task - id, name, class,
- //and schedulingPeriod must be set
- final String serviceId = DomUtils.getChild(serviceElement, "identifier").getTextContent().trim();
- final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
- final String availabilityName = DomUtils.getChild(serviceElement, "availability").getTextContent().trim();
-
- final Availability availability = Availability.valueOf(availabilityName);
-
- //set the class to be used for the configured controller task
- final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, availability, false);
-
- //optional task-specific properties
- for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
- final String name = optionalProperty.getAttribute("name").trim();
- final String value = optionalProperty.getTextContent().trim();
- serviceNode.setProperty(name, value);
- }
-
- services.add(serviceNode);
- provider.enableControllerService(serviceNode);
+ final Document document = builder.parse(in);
+ final Element controllerServices = DomUtils.getChild(document.getDocumentElement(), "controllerServices");
+ final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
+ for (final Element serviceElement : serviceNodes) {
+ //get properties for the specific controller task - id, name, class,
+ //and schedulingPeriod must be set
+ final String serviceId = DomUtils.getChild(serviceElement, "id").getTextContent().trim();
+ final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
+ final String availabilityName = DomUtils.getChild(serviceElement, "availability").getTextContent().trim();
+
+ final Availability availability = Availability.valueOf(availabilityName);
+
+ //set the class to be used for the configured controller task
+ final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, availability, false);
+
+ //optional task-specific properties
+ for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
+ final String name = optionalProperty.getAttribute("name").trim();
+ final String value = optionalProperty.getTextContent().trim();
+ serviceNode.setProperty(name, value);
}
+
+ services.add(serviceNode);
+ provider.enableControllerService(serviceNode);
}
} catch (SAXException | ParserConfigurationException sxe) {
throw new IOException(sxe);
- } finally {
- FileUtils.closeQuietly(fis);
- FileUtils.closeQuietly(bis);
}
return services;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
index 4f3afaf..8cab916 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java
@@ -27,7 +27,6 @@ import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.FlowSynchronizationException;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.UninheritableFlowException;
-import org.apache.nifi.controller.service.ControllerServiceNode;
/**
* Interface to define service methods for FlowController configuration.
@@ -121,15 +120,4 @@ public interface FlowConfigurationDAO {
*/
List<ReportingTaskNode> loadReportingTasks(FlowController controller) throws IOException;
- /**
- * Instantiates all controller services from the file used in the
- * constructor
- *
- * @param controller
- * @return
- * @throws java.io.IOException
- * @returns all of the ReportingTasks that were instantiated & scheduled
- */
- List<ControllerServiceNode> loadControllerServices(FlowController controller) throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4ae2a10e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
index 039b2c2..3000869 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
@@ -54,10 +54,7 @@ import org.apache.nifi.controller.StandardFlowSynchronizer;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
-import org.apache.nifi.controller.service.ControllerServiceLoader;
-import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingInitializationContext;
@@ -65,7 +62,7 @@ import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.util.NiFiProperties;
-
+import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.DOMException;
@@ -81,7 +78,6 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
private final Path flowXmlPath;
private final Path taskConfigXmlPath;
- private final ControllerServiceLoader servicerLoader;
private final StringEncryptor encryptor;
private static final Logger LOG = LoggerFactory.getLogger(StandardXMLFlowConfigurationDAO.class);
@@ -103,7 +99,6 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
this.flowXmlPath = flowXml;
this.taskConfigXmlPath = taskConfigXml;
- this.servicerLoader = new ControllerServiceLoader(serviceConfigXml);
this.encryptor = encryptor;
}
@@ -292,10 +287,6 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
return tasks;
}
- @Override
- public List<ControllerServiceNode> loadControllerServices(final FlowController controller) throws IOException {
- return servicerLoader.loadControllerServices(controller);
- }
private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException {
final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
[4/7] incubator-nifi git commit: NIFI-250: Updated controller
services to use appropriate defaults and use .identifiesControllerService
instead of using the old way of obtaining controller services;
do not fail to startup if controller service is invalid
Posted by ma...@apache.org.
NIFI-250: Updated controller services to use appropriate defaults and use .identifiesControllerService instead of using the old way of obtaining controller services; do not fail to startup if controller service is invalid
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3344cef3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3344cef3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3344cef3
Branch: refs/heads/NIFI-250
Commit: 3344cef3365dfc620f11aaf18fcfc8ba7d58e16a
Parents: 1682e47
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 09:05:58 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 09:05:58 2015 -0500
----------------------------------------------------------------------
.../controller/StandardFlowSynchronizer.java | 48 ++++++++++++++++++--
.../nifi/processors/standard/PostHTTP.java | 38 ++++++++++------
.../DistributedMapCacheClientService.java | 18 +++-----
.../DistributedSetCacheClientService.java | 11 ++---
.../cache/server/DistributedCacheServer.java | 9 ++--
.../nifi/ssl/StandardSSLContextService.java | 2 -
6 files changed, 82 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 33650e1..0619793 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -100,9 +100,11 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
private static final Logger logger = LoggerFactory.getLogger(StandardFlowSynchronizer.class);
public static final URL FLOW_XSD_RESOURCE = StandardFlowSynchronizer.class.getResource("/FlowConfiguration.xsd");
private final StringEncryptor encryptor;
+ private final boolean autoResumeState;
public StandardFlowSynchronizer(final StringEncryptor encryptor) {
this.encryptor = encryptor;
+ autoResumeState = NiFiProperties.getInstance().getAutoResumeState();
}
public static boolean isEmpty(final DataFlow dataFlow, final StringEncryptor encryptor) {
@@ -349,7 +351,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
node.setName(dto.getName());
node.setAvailability(Availability.valueOf(dto.getAvailability()));
node.setComments(dto.getComments());
- node.setDisabled(dto.getEnabled() != Boolean.TRUE);
node.setAnnotationData(dto.getAnnotationData());
for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
@@ -360,9 +361,21 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
}
- if ( dto.getEnabled() == Boolean.TRUE ) {
- controller.enableControllerService(node);
- }
+ if ( autoResumeState ) {
+ if ( dto.getEnabled() == Boolean.TRUE ) {
+ try {
+ controller.enableControllerService(node);
+ } catch (final Exception e) {
+ logger.error("Failed to enable " + node + " due to " + e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+
+ controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+ "Controller Service", Severity.ERROR.name(), "Could not start " + node + " due to " + e));
+ }
+ }
+ }
}
private void updateControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) {
@@ -384,7 +397,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
reportingTask.setComments(dto.getComment());
reportingTask.setAvailability(Availability.valueOf(dto.getAvailability()));
reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
- reportingTask.setScheduledState(ScheduledState.valueOf(dto.getScheduledState()));
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
reportingTask.setAnnotationData(dto.getAnnotationData());
@@ -396,6 +408,32 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
reportingTask.setProperty(entry.getKey(), entry.getValue());
}
}
+
+ if ( autoResumeState ) {
+ if ( ScheduledState.RUNNING.name().equals(dto.getScheduledState()) ) {
+ try {
+ controller.startReportingTask(reportingTask);
+ } catch (final Exception e) {
+ logger.error("Failed to start {} due to {}", reportingTask, e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+ controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+ "Reporting Tasks", Severity.ERROR.name(), "Failed to start " + reportingTask + " due to " + e));
+ }
+ } else if ( ScheduledState.DISABLED.name().equals(dto.getScheduledState()) ) {
+ try {
+ controller.disableReportingTask(reportingTask);
+ } catch (final Exception e) {
+ logger.error("Failed to mark {} as disabled due to {}", reportingTask, e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+ controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+ "Reporting Tasks", Severity.ERROR.name(), "Failed to mark " + reportingTask + " as disabled due to " + e));
+ }
+ }
+ }
}
private void updateReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
index fd486b0..ff7475b 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
@@ -68,6 +68,7 @@ import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.ManagedHttpClientConnection;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLContextBuilder;
import org.apache.http.conn.ssl.SSLContexts;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.entity.ContentProducer;
@@ -352,22 +353,29 @@ public class PostHTTP extends AbstractProcessor {
private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException,
CertificateException, KeyManagementException, UnrecoverableKeyException
{
- final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType());
- try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) {
- truststore.load(in, service.getTrustStorePassword().toCharArray());
- }
-
- final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType());
- try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
- keystore.load(in, service.getKeyStorePassword().toCharArray());
- }
-
- SSLContext sslContext = SSLContexts.custom()
- .loadTrustMaterial(truststore, new TrustSelfSignedStrategy())
- .loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray())
- .build();
+ final SSLContextBuilder builder = SSLContexts.custom();
+
+ final String trustStoreFilename = service.getTrustStoreFile();
+ if ( trustStoreFilename != null ) {
+ final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType());
+ try (final InputStream in = new FileInputStream(new File(trustStoreFilename))) {
+ truststore.load(in, service.getTrustStorePassword().toCharArray());
+ }
+
+ builder.loadTrustMaterial(truststore, new TrustSelfSignedStrategy());
+ }
- return sslContext;
+ final String keyStoreFilename = service.getKeyStoreFile();
+ if ( keyStoreFilename != null ) {
+ final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType());
+ try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
+ keystore.load(in, service.getKeyStorePassword().toCharArray());
+ }
+
+ builder.loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray());
+ }
+
+ return builder.build();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index 6dad80b..956ed16 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -59,16 +59,15 @@ public class DistributedMapCacheClientService extends AbstractControllerService
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
- .description(
- "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+ .description("If specified, indicates the SSL Context Service that is used to communicate with the "
+ + "remote server. If not specified, communications will not be encrypted")
.required(false)
- .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
- .defaultValue(null)
+ .identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
- .description(
- "Specifies how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+ .description("Specifies how long to wait when communicating with the remote server before determining that "
+ + "there is a communications failure if data cannot be sent or received")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 secs")
@@ -94,8 +93,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}
@Override
- public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer)
- throws IOException {
+ public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
return withCommsSession(new CommsAction<Boolean>() {
@Override
public Boolean execute(final CommsSession session) throws IOException {
@@ -131,8 +129,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}
@Override
- public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
- final Deserializer<V> valueDeserializer) throws IOException {
+ public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withCommsSession(new CommsAction<V>() {
@Override
public V execute(final CommsSession session) throws IOException {
@@ -297,7 +294,6 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}
private static interface CommsAction<T> {
-
T execute(CommsSession commsSession) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
index dcc5558..67a2d43 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
@@ -59,16 +59,15 @@ public class DistributedSetCacheClientService extends AbstractControllerService
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
- .description(
- "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+ .description("If specified, indicates the SSL Context Service that is used to communicate with the "
+ + "remote server. If not specified, communications will not be encrypted")
.required(false)
- .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
- .defaultValue(null)
+ .identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
- .description(
- "Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+ .description("Specifices how long to wait when communicating with the remote server before determining "
+ + "that there is a communications failure if data cannot be sent or received")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 secs")
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
index 0817479..66b73e2 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
@@ -42,10 +42,10 @@ public abstract class DistributedCacheServer extends AbstractControllerService {
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
- .description(
- "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure")
+ .description("If specified, this service will be used to create an SSL Context that will be used "
+ + "to secure communications; if not specified, communications will not be secure")
.required(false)
- .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+ .identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder()
.name("Maximum Cache Entries")
@@ -77,8 +77,7 @@ public abstract class DistributedCacheServer extends AbstractControllerService {
properties.add(MAX_CACHE_ENTRIES);
properties.add(EVICTION_POLICY);
properties.add(PERSISTENCE_PATH);
- properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues(
- getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build());
+ properties.add(SSL_CONTEXT_SERVICE);
return properties;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
index ae2e19c..39bb5fb 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
@@ -62,7 +62,6 @@ public class StandardSSLContextService extends AbstractControllerService impleme
.description("The Type of the Truststore. Either JKS or PKCS12")
.allowableValues(STORE_TYPE_JKS, STORE_TYPE_PKCS12)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .defaultValue(STORE_TYPE_JKS)
.sensitive(false)
.build();
public static final PropertyDescriptor TRUSTSTORE_PASSWORD = new PropertyDescriptor.Builder()
@@ -84,7 +83,6 @@ public class StandardSSLContextService extends AbstractControllerService impleme
.description("The Type of the Keystore")
.allowableValues(STORE_TYPE_JKS, STORE_TYPE_PKCS12)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .defaultValue(STORE_TYPE_JKS)
.sensitive(false)
.build();
public static final PropertyDescriptor KEYSTORE_PASSWORD = new PropertyDescriptor.Builder()
[3/7] incubator-nifi git commit: Merge branch 'NIFI-250' of
http://git-wip-us.apache.org/repos/asf/incubator-nifi into NIFI-250
Posted by ma...@apache.org.
Merge branch 'NIFI-250' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into NIFI-250
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1682e47a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1682e47a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1682e47a
Branch: refs/heads/NIFI-250
Commit: 1682e47aa3820bca6ec80e6009a16d0572fb1a9b
Parents: fbc14e0 e7beef8
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 08:36:55 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 08:36:55 2015 -0500
----------------------------------------------------------------------
.../src/main/asciidoc/developer-guide.adoc | 2116 +++++++++++++++++-
1 file changed, 2098 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
[2/7] incubator-nifi git commit: Merge branch 'NIFI-250' of
http://git-wip-us.apache.org/repos/asf/incubator-nifi into NIFI-250
Posted by ma...@apache.org.
Merge branch 'NIFI-250' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into NIFI-250
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/fbc14e0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/fbc14e0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/fbc14e0a
Branch: refs/heads/NIFI-250
Commit: fbc14e0a3a46c0511d6135e86797cfc14c81d7d5
Parents: 9a6acab 1ebaf1d
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Jan 29 16:26:08 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Jan 29 16:26:08 2015 -0500
----------------------------------------------------------------------
.../css/controller-service-configuration.css | 32 ++++++++
.../src/main/webapp/css/settings.css | 2 +-
.../nf-controller-service-configuration.js | 79 ++++++++++++++++----
3 files changed, 96 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
[5/7] incubator-nifi git commit: NIFI-250: Updated
ControllerServiceProvider to have a 'createControllerService' that takes no
identifier and randomly generates one
Posted by ma...@apache.org.
NIFI-250: Updated ControllerServiceProvider to have a 'createControllerService' that takes no identifier and randomly generates one
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3102e083
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3102e083
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3102e083
Branch: refs/heads/NIFI-250
Commit: 3102e08378b34cb869c708faa05c427de6b68139
Parents: 3344cef
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 10:18:54 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 10:18:54 2015 -0500
----------------------------------------------------------------------
.../nifi/cluster/manager/impl/WebClusterManager.java | 3 ++-
.../controller/service/ControllerServiceProvider.java | 11 +++++++++++
.../java/org/apache/nifi/controller/FlowController.java | 2 +-
.../service/StandardControllerServiceProvider.java | 6 ++++++
4 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3102e083/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 27620b5..9649fee 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -1294,8 +1294,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
+ @Override
public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
- return createControllerService(type, UUID.randomUUID().toString(), firstTimeAdded);
+ return controllerServiceProvider.createControllerService(type, firstTimeAdded);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3102e083/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 1bc3964..42d6609 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -27,10 +27,21 @@ import org.apache.nifi.controller.ControllerServiceLookup;
public interface ControllerServiceProvider extends ControllerServiceLookup {
/**
+ * Creates a new Controller Service of the specified type and assigns it a randomly generated ID. If <code>firstTimeadded</code>
+ * is true, calls any methods that are annotated with {@link OnAdded}
+ *
+ * @param type
+ * @param firstTimeAdded
+ * @return
+ */
+ ControllerServiceNode createControllerService(String type, boolean firstTimeAdded);
+
+ /**
* Creates a new Controller Service of the specified type and assigns it the given id. If <code>firstTimeadded</code>
* is true, calls any methods that are annotated with {@link OnAdded}
*
* @param type
+ * @param id
* @param firstTimeAdded
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3102e083/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 209a4d6..347d6b9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2651,7 +2651,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
}
public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
- return createControllerService(type, UUID.randomUUID().toString(), firstTimeAdded);
+ return controllerServiceProvider.createControllerService(type, firstTimeAdded);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3102e083/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 0263ee0..3ec4755 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -95,6 +96,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
populateInterfaces(superClass, interfacesDefinedThusFar);
}
}
+
+ @Override
+ public ControllerServiceNode createControllerService(final String type, final boolean firstTimeAdded) {
+ return createControllerService(type, UUID.randomUUID().toString(), firstTimeAdded);
+ }
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {