You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/10 15:43:56 UTC
[21/62] [abbrv] incubator-nifi git commit: Squashed commit of the
following:
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 4d5455f..eff523a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -16,12 +16,12 @@
*/
package org.apache.nifi.cluster.manager.impl;
-import java.io.File;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.URI;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -41,7 +41,9 @@ import java.util.TimerTask;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -54,17 +56,20 @@ import javax.net.ssl.SSLContext;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.StreamingOutput;
-import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import javax.xml.validation.Validator;
+import javax.xml.transform.stream.StreamResult;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.cluster.BulletinsPayload;
import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.context.ClusterContext;
@@ -122,12 +127,18 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.StandardFlowSerializer;
import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
+import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
+import org.apache.nifi.controller.service.ControllerServiceLoader;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
@@ -147,10 +158,12 @@ import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.framework.security.util.SslContextFactory;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
+import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.remote.RemoteResourceManager;
import org.apache.nifi.remote.RemoteSiteListener;
@@ -168,7 +181,11 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.ObjectHolder;
+import org.apache.nifi.util.ReflectionUtils;
+import org.apache.nifi.web.OptimisticLockingManager;
import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.UpdateRevision;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
@@ -204,6 +221,16 @@ import org.xml.sax.SAXParseException;
import com.sun.jersey.api.client.ClientResponse;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
+import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import org.apache.nifi.web.api.entity.ReportingTasksEntity;
+
/**
* Provides a cluster manager implementation. The manager federates incoming
* HTTP client requests to the nodes' external API using the HTTP protocol. The
@@ -222,7 +249,7 @@ import com.sun.jersey.api.client.ClientResponse;
*
* @author unattributed
*/
-public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider {
+public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider, ReportingTaskProvider {
public static final String ROOT_GROUP_ID_ALIAS = "root";
public static final String BULLETIN_CATEGORY = "Clustering";
@@ -279,6 +306,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}");
+ public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}");
public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}");
@@ -290,12 +318,19 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public static final String PROVENANCE_URI = "/nifi-api/controller/provenance";
public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
-
+
+ public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
+ public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
+ public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
+ public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
+ public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
+
private final NiFiProperties properties;
private final HttpRequestReplicator httpRequestReplicator;
private final HttpResponseMapper httpResponseMapper;
private final DataFlowManagementService dataFlowManagementService;
private final ClusterManagerProtocolSenderListener senderListener;
+ private final OptimisticLockingManager optimisticLockingManager;
private final StringEncryptor encryptor;
private final Queue<Heartbeat> pendingHeartbeats = new ConcurrentLinkedQueue<>();
private final ReentrantReadWriteLock resourceRWLock = new ReentrantReadWriteLock();
@@ -303,12 +338,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private final ClusterManagerLock writeLock = new ClusterManagerLock(resourceRWLock.writeLock(), "Write");
private final Set<Node> nodes = new HashSet<>();
- private final Set<ReportingTaskNode> reportingTasks = new HashSet<>();
+ private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
// null means the dataflow should be read from disk
private StandardDataFlow cachedDataFlow = null;
private NodeIdentifier primaryNodeId = null;
- private Revision revision = new Revision(0L, "");
private Timer heartbeatMonitor;
private Timer heartbeatProcessor;
private volatile ClusterServicesBroadcaster servicesBroadcaster = null;
@@ -329,7 +363,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public WebClusterManager(final HttpRequestReplicator httpRequestReplicator, final HttpResponseMapper httpResponseMapper,
final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener,
- final NiFiProperties properties, final StringEncryptor encryptor) {
+ final NiFiProperties properties, final StringEncryptor encryptor, final OptimisticLockingManager optimisticLockingManager) {
if (httpRequestReplicator == null) {
throw new IllegalArgumentException("HttpRequestReplicator may not be null.");
@@ -348,11 +382,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
this.httpResponseMapper = httpResponseMapper;
this.dataFlowManagementService = dataFlowManagementService;
this.properties = properties;
- this.controllerServiceProvider = new StandardControllerServiceProvider();
this.bulletinRepository = new VolatileBulletinRepository();
this.instanceId = UUID.randomUUID().toString();
this.senderListener = senderListener;
this.encryptor = encryptor;
+ this.optimisticLockingManager = optimisticLockingManager;
senderListener.addHandler(this);
senderListener.setBulletinRepository(bulletinRepository);
@@ -393,9 +427,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public void heartbeat() {
}
}, this, encryptor);
+
+ // When we construct the scheduling agents, we can pass null for a lot of the arguments because we are only
+ // going to be scheduling Reporting Tasks. Otherwise, it would not be okay.
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, new TimerDrivenSchedulingAgent(null, reportingTaskEngine, null, encryptor));
+ processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10);
processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
+
+ controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository);
}
public void start() throws IOException {
@@ -429,14 +469,20 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
// load flow
+ final ClusterDataFlow clusterDataFlow;
if (dataFlowManagementService.isFlowCurrent()) {
- final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow();
+ clusterDataFlow = dataFlowManagementService.loadDataFlow();
cachedDataFlow = clusterDataFlow.getDataFlow();
primaryNodeId = clusterDataFlow.getPrimaryNodeId();
} else {
throw new IOException("Flow is not current.");
}
+ final byte[] serializedServices = clusterDataFlow.getControllerServices();
+ if ( serializedServices != null && serializedServices.length > 0 ) {
+ ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, properties.getAutoResumeState());
+ }
+
// start multicast broadcasting service, if configured
if (servicesBroadcaster != null) {
servicesBroadcaster.start();
@@ -446,8 +492,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
executeSafeModeTask();
// Load and start running Reporting Tasks
- final File taskFile = new File(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
- reportingTasks.addAll(loadReportingTasks(taskFile));
+ final byte[] serializedReportingTasks = clusterDataFlow.getReportingTasks();
+ if ( serializedReportingTasks != null && serializedReportingTasks.length > 0 ) {
+ loadReportingTasks(serializedReportingTasks);
+ }
} catch (final IOException ioe) {
logger.warn("Failed to initialize cluster services due to: " + ioe, ioe);
stop();
@@ -861,22 +909,17 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
reconnectionThread.start();
}
- private List<ReportingTaskNode> loadReportingTasks(final File taskConfigXml) {
- final List<ReportingTaskNode> tasks = new ArrayList<>();
- if (taskConfigXml == null) {
- logger.info("No controller tasks to start");
- return tasks;
- }
+ private Map<String, ReportingTaskNode> loadReportingTasks(final byte[] serialized) {
+ final Map<String, ReportingTaskNode> tasks = new HashMap<>();
try {
- final URL schemaUrl = getClass().getResource("/ReportingTaskConfiguration.xsd");
- final Document document = parse(taskConfigXml, schemaUrl);
+ final Document document = parse(serialized);
- final NodeList tasksNodes = document.getElementsByTagName("tasks");
+ final NodeList tasksNodes = document.getElementsByTagName("reportingTasks");
final Element tasksElement = (Element) tasksNodes.item(0);
//optional properties for all ReportingTasks
- for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "task")) {
+ for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "reportingTask")) {
//add global properties common to all tasks
Map<String, String> properties = new HashMap<>();
@@ -901,17 +944,20 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final String taskSchedulingPeriod = DomUtils.getChild(taskElement, "schedulingPeriod").getTextContent().trim();
final String taskClass = DomUtils.getChild(taskElement, "class").getTextContent().trim();
- //optional task-specific properties
- for (final Element optionalProperty : DomUtils.getChildElementsByTagName(taskElement, "property")) {
- final String name = optionalProperty.getAttribute("name");
- final String value = optionalProperty.getTextContent().trim();
+ final String scheduleStateValue = DomUtils.getChild(taskElement, "scheduledState").getTextContent().trim();
+ final ScheduledState scheduledState = ScheduledState.valueOf(scheduleStateValue);
+
+ // Reporting Task Properties
+ for (final Element property : DomUtils.getChildElementsByTagName(taskElement, "property")) {
+ final String name = DomUtils.getChildText(property, "name");
+ final String value = DomUtils.getChildText(property, "value");
properties.put(name, value);
}
//set the class to be used for the configured reporting task
final ReportingTaskNode reportingTaskNode;
try {
- reportingTaskNode = createReportingTask(taskClass, taskId);
+ reportingTaskNode = createReportingTask(taskClass, taskId, false);
} catch (final ReportingTaskInstantiationException e) {
logger.error("Unable to load reporting task {} due to {}", new Object[]{taskId, e});
if (logger.isDebugEnabled()) {
@@ -922,27 +968,61 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final ReportingTask reportingTask = reportingTaskNode.getReportingTask();
- final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName, schedulingStrategy, taskSchedulingPeriod, this);
+ final ComponentLog componentLog = new SimpleProcessLogger(taskId, reportingTask);
+ final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName,
+ schedulingStrategy, taskSchedulingPeriod, componentLog, this);
reportingTask.initialize(config);
+ final String annotationData = DomUtils.getChildText(taskElement, "annotationData");
+ if ( annotationData != null ) {
+ reportingTaskNode.setAnnotationData(annotationData.trim());
+ }
+
final Map<PropertyDescriptor, String> resolvedProps;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
resolvedProps = new HashMap<>();
for (final Map.Entry<String, String> entry : properties.entrySet()) {
final PropertyDescriptor descriptor = reportingTask.getPropertyDescriptor(entry.getKey());
- resolvedProps.put(descriptor, entry.getValue());
+ if ( entry.getValue() == null ) {
+ resolvedProps.put(descriptor, descriptor.getDefaultValue());
+ } else {
+ resolvedProps.put(descriptor, entry.getValue());
+ }
}
}
for (final Map.Entry<PropertyDescriptor, String> entry : resolvedProps.entrySet()) {
- reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue());
+ if ( entry.getValue() != null ) {
+ reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue());
+ }
+ }
+
+ final String comments = DomUtils.getChildText(taskElement, "comment");
+ if ( comments != null ) {
+ reportingTaskNode.setComments(comments);
}
- processScheduler.schedule(reportingTaskNode);
- tasks.add(reportingTaskNode);
+ reportingTaskNode.setScheduledState(scheduledState);
+ if ( ScheduledState.RUNNING.equals(scheduledState) ) {
+ if ( reportingTaskNode.isValid() ) {
+ try {
+ processScheduler.schedule(reportingTaskNode);
+ } catch (final Exception e) {
+ logger.error("Failed to start {} due to {}", reportingTaskNode, e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+ }
+ } else {
+ logger.error("Failed to start {} because it is invalid due to {}", reportingTaskNode, reportingTaskNode.getValidationErrors());
+ }
+ }
+
+
+ tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode);
}
} catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) {
- logger.error("Unable to load reporting tasks from {} due to {}", new Object[]{taskConfigXml, t});
+ logger.error("Unable to load reporting tasks due to {}", new Object[]{t});
if (logger.isDebugEnabled()) {
logger.error("", t);
}
@@ -951,7 +1031,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return tasks;
}
- private ReportingTaskNode createReportingTask(final String type, final String id) throws ReportingTaskInstantiationException {
+
+ @Override
+ public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
if (type == null) {
throw new NullPointerException();
}
@@ -981,14 +1063,22 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
final ReportingTaskNode taskNode = new ClusteredReportingTaskNode(task, id, processScheduler,
new ClusteredEventAccess(this), bulletinRepository, controllerServiceProvider, validationContextFactory);
+ taskNode.setName(task.getClass().getSimpleName());
+
+ reportingTasks.put(id, taskNode);
+ if ( firstTimeAdded ) {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
+ } catch (final Exception e) {
+ throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e);
+ }
+ }
+
return taskNode;
}
- private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException {
- final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
- final Schema schema = schemaFactory.newSchema(schemaUrl);
+ private Document parse(final byte[] serialized) throws SAXException, ParserConfigurationException, IOException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
- docFactory.setSchema(schema);
final DocumentBuilder builder = docFactory.newDocumentBuilder();
builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@@ -1021,12 +1111,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
});
// build the docuemnt
- final Document document = builder.parse(xmlFile);
-
- // ensure schema compliance
- final Validator validator = schema.newValidator();
- validator.validate(new DOMSource(document));
-
+ final Document document = builder.parse(new ByteArrayInputStream(serialized));
return document;
}
@@ -1287,7 +1372,19 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
writeLock.unlock("handleControllerStartupFailure");
}
}
-
+
+ /**
+ * Adds an instance of a specified controller service.
+ *
+ * @param type
+ * @param id
+ * @param properties
+ * @return
+ */
+ @Override
+ public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
+ return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+ }
@Override
public ControllerService getControllerService(String serviceIdentifier) {
@@ -1310,11 +1407,16 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
@Override
- public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
- return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+ public boolean isControllerServiceEnabling(final String serviceIdentifier) {
+ return controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier);
}
@Override
+ public String getControllerServiceName(final String serviceIdentifier) {
+ return controllerServiceProvider.getControllerServiceName(serviceIdentifier);
+ }
+
+ @Override
public void removeControllerService(final ControllerServiceNode serviceNode) {
controllerServiceProvider.removeControllerService(serviceNode);
}
@@ -1326,10 +1428,214 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
@Override
+ public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
+ controllerServiceProvider.enableControllerServices(serviceNodes);
+ }
+
+ @Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
controllerServiceProvider.disableControllerService(serviceNode);
}
+ @Override
+ public Set<ControllerServiceNode> getAllControllerServices() {
+ return controllerServiceProvider.getAllControllerServices();
+ }
+
+
+ @Override
+ public void disableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.disableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void enableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.enableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.scheduleReferencingComponents(serviceNode);
+ }
+
+ @Override
+ public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
+ }
+
+ @Override
+ public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode);
+ }
+
+ @Override
+ public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode);
+ }
+
+ private byte[] serialize(final Document doc) throws TransformerException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DOMSource domSource = new DOMSource(doc);
+ final StreamResult streamResult = new StreamResult(baos);
+
+ // configure the transformer and convert the DOM
+ final TransformerFactory transformFactory = TransformerFactory.newInstance();
+ final Transformer transformer = transformFactory.newTransformer();
+ transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+
+ // transform the document to byte stream
+ transformer.transform(domSource, streamResult);
+ return baos.toByteArray();
+ }
+
+ private byte[] serializeControllerServices() throws ParserConfigurationException, TransformerException {
+ final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+ final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+ final Document document = docBuilder.newDocument();
+ final Element rootElement = document.createElement("controllerServices");
+ document.appendChild(rootElement);
+
+ for ( final ControllerServiceNode serviceNode : getAllControllerServices() ) {
+ StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor);
+ }
+
+ return serialize(document);
+ }
+
+ private byte[] serializeReportingTasks() throws ParserConfigurationException, TransformerException {
+ final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+ final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+ final Document document = docBuilder.newDocument();
+ final Element rootElement = document.createElement("reportingTasks");
+ document.appendChild(rootElement);
+
+ for ( final ReportingTaskNode taskNode : getAllReportingTasks() ) {
+ StandardFlowSerializer.addReportingTask(rootElement, taskNode, encryptor);
+ }
+
+ return serialize(document);
+ }
+
+
+ public void saveControllerServices() {
+ try {
+ dataFlowManagementService.updateControllerServices(serializeControllerServices());
+ } catch (final Exception e) {
+ logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+
+ getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(),
+ "Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details."));
+ }
+ }
+
+ public void saveReportingTasks() {
+ try {
+ dataFlowManagementService.updateReportingTasks(serializeReportingTasks());
+ } catch (final Exception e) {
+ logger.error("Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart due to " + e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+
+ getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Reporting Tasks", Severity.ERROR.name(),
+ "Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart. See logs for more details."));
+ }
+ }
+
+ @Override
+ public Set<ReportingTaskNode> getAllReportingTasks() {
+ readLock.lock();
+ try {
+ return new HashSet<>(reportingTasks.values());
+ } finally {
+ readLock.unlock("getReportingTasks");
+ }
+ }
+
+ @Override
+ public ReportingTaskNode getReportingTaskNode(final String taskId) {
+ readLock.lock();
+ try {
+ return reportingTasks.get(taskId);
+ } finally {
+ readLock.unlock("getReportingTaskNode");
+ }
+ }
+
+ @Override
+ public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
+ reportingTaskNode.verifyCanStart();
+ processScheduler.schedule(reportingTaskNode);
+ }
+
+
+ @Override
+ public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
+ reportingTaskNode.verifyCanStop();
+ processScheduler.unschedule(reportingTaskNode);
+ }
+
+ @Override
+ public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
+ writeLock.lock();
+ try {
+ final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
+ if ( existing == null || existing != reportingTaskNode ) {
+ throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
+ }
+
+ reportingTaskNode.verifyCanDelete();
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
+ }
+
+ for ( final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet() ) {
+ final PropertyDescriptor descriptor = entry.getKey();
+ if (descriptor.getControllerServiceDefinition() != null ) {
+ final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
+ if ( value != null ) {
+ final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value);
+ if ( serviceNode != null ) {
+ serviceNode.removeReference(reportingTaskNode);
+ }
+ }
+ }
+ }
+
+ reportingTasks.remove(reportingTaskNode.getIdentifier());
+ } finally {
+ writeLock.unlock("removeReportingTask");
+ }
+ }
+
+
+ @Override
+ public void disableReportingTask(final ReportingTaskNode reportingTask) {
+ reportingTask.verifyCanDisable();
+ processScheduler.disableReportingTask(reportingTask);
+ }
+
+ @Override
+ public void enableReportingTask(final ReportingTaskNode reportingTask) {
+ reportingTask.verifyCanEnable();
+ processScheduler.enableReportingTask(reportingTask);
+ }
+
/**
* Handle a bulletins message.
@@ -1966,65 +2272,114 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// check if this request can change the flow
final boolean mutableRequest = canChangeNodeState(method, uri);
- // update headers to contain cluster contextual information to send to the node
- final Map<String, String> updatedHeaders = new HashMap<>(headers);
- final ClusterContext clusterCtx = new ClusterContextImpl();
- clusterCtx.setRequestSentByClusterManager(true); // indicate request is sent from cluster manager
- clusterCtx.setRevision(revision);
+ final ObjectHolder<NodeResponse> holder = new ObjectHolder<>(null);
+ final UpdateRevision federateRequest = new UpdateRevision() {
+ @Override
+ public Revision execute(Revision currentRevision) {
+ // update headers to contain cluster contextual information to send to the node
+ final Map<String, String> updatedHeaders = new HashMap<>(headers);
+ final ClusterContext clusterCtx = new ClusterContextImpl();
+ clusterCtx.setRequestSentByClusterManager(true); // indicate request is sent from cluster manager
+ clusterCtx.setRevision(currentRevision);
+
+ // serialize cluster context and add to request header
+ final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx);
+ updatedHeaders.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx);
+
+ // if the request is mutable, we need to verify that it is a valid request for all nodes in the cluster.
+ if (mutableRequest) {
+ updatedHeaders.put(NCM_EXPECTS_HTTP_HEADER, "150-NodeContinue");
- // serialize cluster context and add to request header
- final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx);
- updatedHeaders.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx);
+ final Set<NodeResponse> nodeResponses;
+ if (entity == null) {
+ nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
+ } else {
+ nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
+ }
- // if the request is mutable, we need to verify that it is a valid request for all nodes in the cluster.
- if (mutableRequest) {
- updatedHeaders.put(NCM_EXPECTS_HTTP_HEADER, "150-NodeContinue");
+ updatedHeaders.remove(NCM_EXPECTS_HTTP_HEADER);
- final Set<NodeResponse> nodeResponses;
- if (entity == null) {
- nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
- } else {
- nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
- }
+ for (final NodeResponse response : nodeResponses) {
+ if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
+ final String nodeDescription = response.getNodeId().getApiAddress() + ":" + response.getNodeId().getApiPort();
+ final ClientResponse clientResponse = response.getClientResponse();
+ if (clientResponse == null) {
+ throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus());
+ }
+ final String nodeExplanation = clientResponse.getEntity(String.class);
+ throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: " + nodeExplanation, response.getThrowable());
+ }
+ }
- updatedHeaders.remove(NCM_EXPECTS_HTTP_HEADER);
+ // set flow state to unknown to denote a mutable request replication in progress
+ logger.debug("Setting Flow State to UNKNOWN due to mutable request to {} {}", method, uri);
+ notifyDataFlowManagmentServiceOfFlowStateChange(PersistedFlowState.UNKNOWN);
+ }
- for (final NodeResponse response : nodeResponses) {
- if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
- final String nodeDescription = response.getNodeId().getApiAddress() + ":" + response.getNodeId().getApiPort();
- final ClientResponse clientResponse = response.getClientResponse();
- if (clientResponse == null) {
- throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus());
+ // replicate request
+ final Set<NodeResponse> nodeResponses;
+ try {
+ if (entity == null) {
+ nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
+ } else {
+ nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
+ }
+ } catch (final UriConstructionException uce) {
+ // request was not replicated, so mark the flow with its original state
+ if (mutableRequest) {
+ notifyDataFlowManagmentServiceOfFlowStateChange(originalPersistedFlowState);
}
- final String nodeExplanation = clientResponse.getEntity(String.class);
- throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: " + nodeExplanation, response.getThrowable());
- }
- }
- // set flow state to unknown to denote a mutable request replication in progress
- logger.debug("Setting Flow State to UNKNOWN due to mutable request to {} {}", method, uri);
- notifyDataFlowManagmentServiceOfFlowStateChange(PersistedFlowState.UNKNOWN);
- }
+ throw uce;
+ }
- // replicate request
- final Set<NodeResponse> nodeResponses;
- try {
- if (entity == null) {
- nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
- } else {
- nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
- }
- } catch (final UriConstructionException uce) {
- // request was not replicated, so mark the flow with its original state
- if (mutableRequest) {
- notifyDataFlowManagmentServiceOfFlowStateChange(originalPersistedFlowState);
+ // merge the response
+ final NodeResponse clientResponse = mergeResponses(uri, method, nodeResponses, mutableRequest);
+ holder.set(clientResponse);
+
+ // if we have a response get the updated cluster context for auditing and revision updating
+ Revision updatedRevision = null;
+ if (mutableRequest && clientResponse != null) {
+ try {
+ // get the cluster context from the response header
+ final String serializedClusterContext = clientResponse.getClientResponse().getHeaders().getFirst(CLUSTER_CONTEXT_HTTP_HEADER);
+ if (StringUtils.isNotBlank(serializedClusterContext)) {
+ // deserialize object
+ final Serializable clusterContextObj = WebUtils.deserializeHexToObject(serializedClusterContext);
+
+ // if we have a valid object, audit the actions
+ if (clusterContextObj instanceof ClusterContext) {
+ final ClusterContext clusterContext = (ClusterContext) clusterContextObj;
+ if (auditService != null) {
+ try {
+ auditService.addActions(clusterContext.getActions());
+ } catch (Throwable t) {
+ logger.warn("Unable to record actions: " + t.getMessage());
+ if (logger.isDebugEnabled()) {
+ logger.warn(StringUtils.EMPTY, t);
+ }
+ }
+ }
+ updatedRevision = clusterContext.getRevision();
+ }
+ }
+ } catch (final ClassNotFoundException cnfe) {
+ logger.warn("Classpath issue detected because failed to deserialize cluster context from node response due to: " + cnfe, cnfe);
+ }
+ }
+
+ return updatedRevision;
}
-
- throw uce;
+ };
+
+ // federate the request and lock on the revision
+ if (mutableRequest) {
+ optimisticLockingManager.setRevision(federateRequest);
+ } else {
+ federateRequest.execute(optimisticLockingManager.getLastModification().getRevision());
}
-
- final NodeResponse clientResponse = mergeResponses(uri, method, nodeResponses, mutableRequest);
- return clientResponse;
+
+ return holder.get();
}
private static boolean isProcessorsEndpoint(final URI uri, final String method) {
@@ -2032,7 +2387,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
private static boolean isProcessorEndpoint(final URI uri, final String method) {
- if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches()) {
+ if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches()) ) {
return true;
} else if ("POST".equalsIgnoreCase(method) && PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
@@ -2079,13 +2434,51 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private static boolean isProvenanceEventEndpoint(final URI uri, final String method) {
return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
}
+
+ private static boolean isControllerServicesEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath());
+ }
+
+ private static boolean isControllerServiceEndpoint(final URI uri, final String method) {
+ if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) {
+ return true;
+ } else if ("POST".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath())) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private static boolean isControllerServiceReferenceEndpoint(final URI uri, final String method) {
+ if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private static boolean isReportingTasksEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath());
+ }
+
+ private static boolean isReportingTaskEndpoint(final URI uri, final String method) {
+ if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REPORTING_TASK_URI_PATTERN.matcher(uri.getPath()).matches()) {
+ return true;
+ } else if ("POST".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath())) {
+ return true;
+ }
+
+ return false;
+ }
static boolean isResponseInterpreted(final URI uri, final String method) {
return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method)
|| isRemoteProcessGroupsEndpoint(uri, method) || isRemoteProcessGroupEndpoint(uri, method)
|| isProcessGroupEndpoint(uri, method)
|| isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method)
- || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method);
+ || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method)
+ || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method)
+ || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method);
}
private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) {
@@ -2095,37 +2488,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final NodeIdentifier nodeId = nodeEntry.getKey();
final ProcessorDTO nodeProcessor = nodeEntry.getValue();
- // get the processor's validation errors and put them into a map
- // where the key is the validation error and the value is the set of all
- // nodes that reported that validation error.
- final Collection<String> nodeValidationErrors = nodeProcessor.getValidationErrors();
- if (nodeValidationErrors != null) {
- for (final String nodeValidationError : nodeValidationErrors) {
- Set<NodeIdentifier> nodeSet = validationErrorMap.get(nodeValidationError);
- if (nodeSet == null) {
- nodeSet = new HashSet<>();
- validationErrorMap.put(nodeValidationError, nodeSet);
- }
- nodeSet.add(nodeId);
- }
- }
- }
-
- final Set<String> normalizedValidationErrors = new HashSet<>();
- for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) {
- final String msg = validationEntry.getKey();
- final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
-
- if (nodeIds.size() == processorMap.size()) {
- normalizedValidationErrors.add(msg);
- } else {
- for (final NodeIdentifier nodeId : nodeIds) {
- normalizedValidationErrors.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg);
- }
- }
+ // merge the validation errors
+ mergeValidationErrors(validationErrorMap, nodeId, nodeProcessor.getValidationErrors());
}
- processor.setValidationErrors(normalizedValidationErrors);
+ // set the merged the validation errors
+ processor.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, processorMap.size()));
}
private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final Set<NodeResponse> problematicResponses) {
@@ -2293,7 +2661,158 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
remoteProcessGroup.setAuthorizationIssues(mergedAuthorizationIssues);
}
}
+
+ private void mergeControllerServiceReferences(final Set<ControllerServiceReferencingComponentDTO> referencingComponents, final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> referencingComponentMap) {
+ final Map<String, Integer> activeThreadCounts = new HashMap<>();
+ final Map<String, String> states = new HashMap<>();
+ for (final Map.Entry<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeEntry : referencingComponentMap.entrySet()) {
+ final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeEntry.getValue();
+
+ // go through all the nodes referencing components
+ if ( nodeReferencingComponents != null ) {
+ for (final ControllerServiceReferencingComponentDTO nodeReferencingComponent : nodeReferencingComponents) {
+ // handle active thread counts
+ if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) {
+ final Integer current = activeThreadCounts.get(nodeReferencingComponent.getId());
+ if (current == null) {
+ activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount());
+ } else {
+ activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current);
+ }
+ }
+
+ // handle controller service state
+ final String state = states.get(nodeReferencingComponent.getId());
+ if (state == null) {
+ if (ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState())) {
+ states.put(nodeReferencingComponent.getId(), ControllerServiceState.DISABLING.name());
+ } else if (ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState())) {
+ states.put(nodeReferencingComponent.getId(), ControllerServiceState.ENABLING.name());
+ }
+ }
+ }
+ }
+ }
+
+ // go through each referencing components
+ for (final ControllerServiceReferencingComponentDTO referencingComponent : referencingComponents) {
+ final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId());
+ if (activeThreadCount != null) {
+ referencingComponent.setActiveThreadCount(activeThreadCount);
+ }
+
+ final String state = states.get(referencingComponent.getId());
+ if (state != null) {
+ referencingComponent.setState(state);
+ }
+ }
+ }
+
+ private void mergeControllerService(final ControllerServiceDTO controllerService, final Map<NodeIdentifier, ControllerServiceDTO> controllerServiceMap) {
+ final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
+ final Set<ControllerServiceReferencingComponentDTO> referencingComponents = controllerService.getReferencingComponents();
+ final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = new HashMap<>();
+
+ String state = null;
+ for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : controllerServiceMap.entrySet()) {
+ final NodeIdentifier nodeId = nodeEntry.getKey();
+ final ControllerServiceDTO nodeControllerService = nodeEntry.getValue();
+
+ if (state == null) {
+ if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) {
+ state = ControllerServiceState.DISABLING.name();
+ } else if (ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState())) {
+ state = ControllerServiceState.ENABLING.name();
+ }
+ }
+
+ for (final ControllerServiceReferencingComponentDTO nodeReferencingComponents : nodeControllerService.getReferencingComponents()) {
+ nodeReferencingComponentsMap.put(nodeId, nodeReferencingComponents.getReferencingComponents());
+ }
+
+ // merge the validation errors
+ mergeValidationErrors(validationErrorMap, nodeId, nodeControllerService.getValidationErrors());
+ }
+
+ // merge the referencing components
+ mergeControllerServiceReferences(referencingComponents, nodeReferencingComponentsMap);
+
+ // store the 'transition' state is applicable
+ if (state != null) {
+ controllerService.setState(state);
+ }
+
+ // set the merged the validation errors
+ controllerService.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, controllerServiceMap.size()));
+ }
+
+ private void mergeReportingTask(final ReportingTaskDTO reportingTask, final Map<NodeIdentifier, ReportingTaskDTO> reportingTaskMap) {
+ final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
+
+ int activeThreadCount = 0;
+ for (final Map.Entry<NodeIdentifier, ReportingTaskDTO> nodeEntry : reportingTaskMap.entrySet()) {
+ final NodeIdentifier nodeId = nodeEntry.getKey();
+ final ReportingTaskDTO nodeReportingTask = nodeEntry.getValue();
+
+ if (nodeReportingTask.getActiveThreadCount() != null) {
+ activeThreadCount += nodeReportingTask.getActiveThreadCount();
+ }
+
+ // merge the validation errors
+ mergeValidationErrors(validationErrorMap, nodeId, nodeReportingTask.getValidationErrors());
+ }
+
+ // set the merged active thread counts
+ reportingTask.setActiveThreadCount(activeThreadCount);
+
+ // set the merged the validation errors
+ reportingTask.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, reportingTaskMap.size()));
+ }
+ /**
+ * Merges the validation errors into the specified map, recording the corresponding node identifier.
+ *
+ * @param validationErrorMap
+ * @param nodeId
+ * @param nodeValidationErrors
+ */
+ public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeValidationErrors) {
+ if (nodeValidationErrors != null) {
+ for (final String nodeValidationError : nodeValidationErrors) {
+ Set<NodeIdentifier> nodeSet = validationErrorMap.get(nodeValidationError);
+ if (nodeSet == null) {
+ nodeSet = new HashSet<>();
+ validationErrorMap.put(nodeValidationError, nodeSet);
+ }
+ nodeSet.add(nodeId);
+ }
+ }
+ }
+
+ /**
+ * Normalizes the validation errors by prepending the corresponding nodes when the error does not exist across all nodes.
+ *
+ * @param validationErrorMap
+ * @param totalNodes
+ * @return
+ */
+ public Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) {
+ final Set<String> normalizedValidationErrors = new HashSet<>();
+ for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) {
+ final String msg = validationEntry.getKey();
+ final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
+
+ if (nodeIds.size() == totalNodes) {
+ normalizedValidationErrors.add(msg);
+ } else {
+ for (final NodeIdentifier nodeId : nodeIds) {
+ normalizedValidationErrors.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg);
+ }
+ }
+ }
+ return normalizedValidationErrors;
+ }
+
// requires write lock to be already acquired unless request is not mutable
private NodeResponse mergeResponses(final URI uri, final String method, final Set<NodeResponse> nodeResponses, final boolean mutableRequest) {
// holds the one response of all the node responses to return to the client
@@ -2583,6 +3102,126 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
event.setClusterNodeAddress(nodeId.getApiAddress() + ":" + nodeId.getApiPort());
clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isControllerServiceEndpoint(uri, method)) {
+ final ControllerServiceEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
+ final ControllerServiceDTO controllerService = responseEntity.getControllerService();
+
+ final Map<NodeIdentifier, ControllerServiceDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ControllerServiceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
+ final ControllerServiceDTO nodeControllerService = nodeResponseEntity.getControllerService();
+
+ resultsMap.put(nodeResponse.getNodeId(), nodeControllerService);
+ }
+ mergeControllerService(controllerService, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isControllerServicesEndpoint(uri, method)) {
+ final ControllerServicesEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
+ final Set<ControllerServiceDTO> controllerServices = responseEntity.getControllerServices();
+
+ final Map<String, Map<NodeIdentifier, ControllerServiceDTO>> controllerServiceMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ControllerServicesEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
+ final Set<ControllerServiceDTO> nodeControllerServices = nodeResponseEntity.getControllerServices();
+
+ for (final ControllerServiceDTO nodeControllerService : nodeControllerServices) {
+ Map<NodeIdentifier, ControllerServiceDTO> innerMap = controllerServiceMap.get(nodeControllerService.getId());
+ if (innerMap == null) {
+ innerMap = new HashMap<>();
+ controllerServiceMap.put(nodeControllerService.getId(), innerMap);
+ }
+
+ innerMap.put(nodeResponse.getNodeId(), nodeControllerService);
+ }
+ }
+
+ for (final ControllerServiceDTO controllerService : controllerServices) {
+ final String procId = controllerService.getId();
+ final Map<NodeIdentifier, ControllerServiceDTO> mergeMap = controllerServiceMap.get(procId);
+
+ mergeControllerService(controllerService, mergeMap);
+ }
+
+ // create a new client response
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isControllerServiceReferenceEndpoint(uri, method)) {
+ final ControllerServiceReferencingComponentsEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+ final Set<ControllerServiceReferencingComponentDTO> referencingComponents = responseEntity.getControllerServiceReferencingComponents();
+
+ final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ControllerServiceReferencingComponentsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+ final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents();
+
+ resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents);
+ }
+ mergeControllerServiceReferences(referencingComponents, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isReportingTaskEndpoint(uri, method)) {
+ final ReportingTaskEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
+ final ReportingTaskDTO reportingTask = responseEntity.getReportingTask();
+
+ final Map<NodeIdentifier, ReportingTaskDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ReportingTaskEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
+ final ReportingTaskDTO nodeReportingTask = nodeResponseEntity.getReportingTask();
+
+ resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask);
+ }
+ mergeReportingTask(reportingTask, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isReportingTasksEndpoint(uri, method)) {
+ final ReportingTasksEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+ final Set<ReportingTaskDTO> reportingTaskSet = responseEntity.getReportingTasks();
+
+ final Map<String, Map<NodeIdentifier, ReportingTaskDTO>> reportingTaskMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ReportingTasksEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+ final Set<ReportingTaskDTO> nodeReportingTasks = nodeResponseEntity.getReportingTasks();
+
+ for (final ReportingTaskDTO nodeReportingTask : nodeReportingTasks) {
+ Map<NodeIdentifier, ReportingTaskDTO> innerMap = reportingTaskMap.get(nodeReportingTask.getId());
+ if (innerMap == null) {
+ innerMap = new HashMap<>();
+ reportingTaskMap.put(nodeReportingTask.getId(), innerMap);
+ }
+
+ innerMap.put(nodeResponse.getNodeId(), nodeReportingTask);
+ }
+ }
+
+ for (final ReportingTaskDTO reportingTask : reportingTaskSet) {
+ final String procId = reportingTask.getId();
+ final Map<NodeIdentifier, ReportingTaskDTO> mergeMap = reportingTaskMap.get(procId);
+
+ mergeReportingTask(reportingTask, mergeMap);
+ }
+
+ // create a new client response
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
} else {
if (!nodeResponsesToDrain.isEmpty()) {
drainResponses(nodeResponsesToDrain);
@@ -2616,36 +3255,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
logger.warn("All nodes failed to process URI {}. As a result, no node will be disconnected from cluster", uri);
}
}
-
- // if at least one node satisfied the request, then audit the action
- if (hasClientResponse) {
- try {
- // get the cluster context from the response header
- final String serializedClusterContext = clientResponse.getClientResponse().getHeaders().getFirst(CLUSTER_CONTEXT_HTTP_HEADER);
- if (StringUtils.isNotBlank(serializedClusterContext)) {
- // deserialize object
- final Serializable clusterContextObj = WebUtils.deserializeHexToObject(serializedClusterContext);
-
- // if we have a valid object, audit the actions
- if (clusterContextObj instanceof ClusterContext) {
- final ClusterContext clusterContext = (ClusterContext) clusterContextObj;
- if (auditService != null) {
- try {
- auditService.addActions(clusterContext.getActions());
- } catch (Throwable t) {
- logger.warn("Unable to record actions: " + t.getMessage());
- if (logger.isDebugEnabled()) {
- logger.warn(StringUtils.EMPTY, t);
- }
- }
- }
- revision = clusterContext.getRevision();
- }
- }
- } catch (final ClassNotFoundException cnfe) {
- logger.warn("Classpath issue detected because failed to deserialize cluster context from node response due to: " + cnfe, cnfe);
- }
- }
}
return clientResponse;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
index 7169730..d3cff3b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.cluster.spring;
-import java.nio.file.Paths;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.cluster.event.EventManager;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
@@ -26,11 +25,11 @@ import org.apache.nifi.cluster.manager.HttpResponseMapper;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
-import org.apache.nifi.controller.service.ControllerServiceLoader;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.OptimisticLockingManager;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.context.ApplicationContext;
@@ -50,6 +49,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
private NiFiProperties properties;
private StringEncryptor encryptor;
+
+ private OptimisticLockingManager optimisticLockingManager;
@Override
public Object getObject() throws Exception {
@@ -62,13 +63,6 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
*/
return null;
} else if (clusterManager == null) {
-
- // get the service configuration path (fail early)
- final String serviceConfigurationFile = properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE);
- if (serviceConfigurationFile == null) {
- throw new NullPointerException("The service configuration file has not been specified.");
- }
-
final HttpRequestReplicator requestReplicator = applicationContext.getBean("httpRequestReplicator", HttpRequestReplicator.class);
final HttpResponseMapper responseMapper = applicationContext.getBean("httpResponseMapper", HttpResponseMapper.class);
final DataFlowManagementService dataFlowService = applicationContext.getBean("dataFlowManagementService", DataFlowManagementService.class);
@@ -81,7 +75,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
dataFlowService,
senderListener,
properties,
- encryptor
+ encryptor,
+ optimisticLockingManager
);
// set the service broadcaster
@@ -106,10 +101,6 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
// set the audit service
clusterManager.setAuditService(applicationContext.getBean("auditService", AuditService.class));
-
- // load the controller services
- final ControllerServiceLoader serviceLoader = new ControllerServiceLoader(Paths.get(serviceConfigurationFile));
- serviceLoader.loadControllerServices(clusterManager);
}
return clusterManager;
}
@@ -136,4 +127,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
public void setEncryptor(final StringEncryptor encryptor) {
this.encryptor = encryptor;
}
+
+ public void setOptimisticLockingManager(OptimisticLockingManager optimisticLockingManager) {
+ this.optimisticLockingManager = optimisticLockingManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
index 68c29bc..72c7bff 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
@@ -91,10 +91,14 @@
<property name="properties" ref="nifiProperties"/>
</bean>
+ <!-- cluster manager optimistic locking manager -->
+ <bean id="clusterManagerOptimisticLockingManager" class="org.apache.nifi.web.StandardOptimisticLockingManager"/>
+
<!-- cluster manager -->
<bean id="clusterManager" class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean">
<property name="properties" ref="nifiProperties"/>
<property name="encryptor" ref="stringEncryptor"/>
+ <property name="optimisticLockingManager" ref="clusterManagerOptimisticLockingManager"/>
</bean>
<!-- discoverable services -->
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore
index ea8c4bf..29546b5 100755
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore
@@ -1 +1,2 @@
/target
+/target/
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
index ef4b72a..c44161f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
@@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -149,6 +150,16 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
final PropertyDescriptor descriptor = component.getPropertyDescriptor(name);
String value = null;
if (!descriptor.isRequired() && (value = properties.remove(descriptor)) != null) {
+
+ if ( descriptor.getControllerServiceDefinition() != null ) {
+ if (value != null) {
+ final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(value);
+ if (oldNode != null) {
+ oldNode.removeReference(this);
+ }
+ }
+ }
+
component.onPropertyModified(descriptor, value, null);
return true;
}
@@ -250,12 +261,17 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
return true;
}
+
@Override
public Collection<ValidationResult> getValidationErrors() {
+ return getValidationErrors(Collections.<String>emptySet());
+ }
+
+ public Collection<ValidationResult> getValidationErrors(final Set<String> serviceIdentifiersNotToValidate) {
final List<ValidationResult> results = new ArrayList<>();
lock.lock();
try {
- final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData());
+ final ValidationContext validationContext = validationContextFactory.newValidationContext(serviceIdentifiersNotToValidate, getProperties(), getAnnotationData());
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
deleted file mode 100644
index 38df6f7..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Availability.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-public enum Availability {
-
- CLUSTER_MANAGER_ONLY,
- NODE_ONLY,
- BOTH;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
index 303f540..c3b6613 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -19,8 +19,7 @@ package org.apache.nifi.controller;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
-import org.apache.nifi.processor.annotation.OnScheduled;
-import org.apache.nifi.processor.annotation.OnUnscheduled;
+import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
public interface ProcessScheduler {
@@ -143,4 +142,28 @@ public interface ProcessScheduler {
* @param procNode
*/
void yield(ProcessorNode procNode);
+
+ /**
+ * Stops scheduling the given Reporting Task to run
+ * @param taskNode
+ */
+ void unschedule(ReportingTaskNode taskNode);
+
+ /**
+ * Begins scheduling the given Reporting Task to run
+ * @param taskNode
+ */
+ void schedule(ReportingTaskNode taskNode);
+
+ /**
+ * Enables the Controller Service so that it can be used by Reporting Tasks and Processors
+ * @param service
+ */
+ void enableControllerService(ControllerServiceNode service);
+
+ /**
+ * Disables the Controller Service so that it can be updated
+ * @param service
+ */
+ void disableControllerService(ControllerServiceNode service);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index f6786fa..3189edd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -21,6 +21,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.Processor;
@@ -77,4 +78,19 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
public abstract void setStyle(Map<String, String> style);
+ /**
+ * Returns the number of threads (concurrent tasks) currently being used by this Processor
+ * @return
+ */
+ public abstract int getActiveThreadCount();
+
+ /**
+ * Verifies that this Processor can be started if the provided set of
+ * services are enabled. This is introduced because we need to verify that all components
+ * can be started before starting any of them. In order to do that, we need to know that this
+ * component can be started if the given services are enabled, as we will then enable the given
+ * services before starting this component.
+ * @param ignoredReferences
+ */
+ public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index fa48cb3..c932f30 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -16,18 +16,16 @@
*/
package org.apache.nifi.controller;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
public interface ReportingTaskNode extends ConfiguredComponent {
- Availability getAvailability();
-
- void setAvailability(Availability availability);
-
void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
SchedulingStrategy getSchedulingStrategy();
@@ -53,6 +51,12 @@ public interface ReportingTaskNode extends ConfiguredComponent {
ConfigurationContext getConfigurationContext();
boolean isRunning();
+
+ /**
+ * Returns the number of threads (concurrent tasks) currently being used by this ReportingTask
+ * @return
+ */
+ int getActiveThreadCount();
/**
* Indicates the {@link ScheduledState} of this <code>ReportingTask</code>. A
@@ -68,6 +72,20 @@ public interface ReportingTaskNode extends ConfiguredComponent {
void setScheduledState(ScheduledState state);
+ String getComments();
+
+ void setComments(String comment);
+
+ /**
+ * Verifies that this Reporting Task can be enabled if the provided set of
+ * services are enabled. This is introduced because we need to verify that all components
+ * can be started before starting any of them. In order to do that, we need to know that this
+ * component can be started if the given services are enabled, as we will then enable the given
+ * services before starting this component.
+ * @param ignoredReferences
+ */
+ void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
+
void verifyCanStart();
void verifyCanStop();
void verifyCanDisable();