You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/22 17:46:49 UTC
[06/11] incubator-nifi git commit: NIFI-271 checkpoint
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index ec25ab1..07e754e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -457,11 +457,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
rootGroup.setName(DEFAULT_ROOT_GROUP_NAME);
instanceId = UUID.randomUUID().toString();
- if (remoteInputSocketPort == null){
+ if (remoteInputSocketPort == null) {
LOG.info("Not enabling Site-to-Site functionality because nifi.remote.input.socket.port is not set");
externalSiteListener = null;
} else if (isSiteToSiteSecure && sslContext == null) {
- LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
+ LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore "
+ + "Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
externalSiteListener = null;
} else {
// Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol
@@ -530,7 +531,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
};
}
-
+
public void initializeFlow() throws IOException {
writeLock.lock();
try {
@@ -584,17 +585,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* Causes any processors that were added to the flow with a 'delayStart'
* flag of true to now start
* </p>
+ *
+ * @param startDelayedComponents true if start
*/
public void onFlowInitialized(final boolean startDelayedComponents) {
writeLock.lock();
try {
- if ( startDelayedComponents ) {
+ if (startDelayedComponents) {
LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size()));
for (final Connectable connectable : startConnectablesAfterInitialization) {
if (connectable.getScheduledState() == ScheduledState.DISABLED) {
continue;
}
-
+
try {
if (connectable instanceof ProcessorNode) {
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
@@ -603,14 +606,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
} catch (final Throwable t) {
LOG.error("Unable to start {} due to {}", new Object[]{connectable, t.toString()});
- if ( LOG.isDebugEnabled() ) {
+ if (LOG.isDebugEnabled()) {
LOG.error("", t);
}
}
}
-
+
startConnectablesAfterInitialization.clear();
-
+
int startedTransmitting = 0;
for (final RemoteGroupPort remoteGroupPort : startRemoteGroupPortsAfterInitialization) {
try {
@@ -620,7 +623,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t});
}
}
-
+
LOG.info("Started {} Remote Group Ports transmitting", startedTransmitting);
startRemoteGroupPortsAfterInitialization.clear();
} else {
@@ -635,7 +638,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
}
}
-
+
startConnectablesAfterInitialization.clear();
startRemoteGroupPortsAfterInitialization.clear();
}
@@ -720,9 +723,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* Creates a new Label
*
- * @param id
- * @param text
- * @return
+ * @param id identifier
+ * @param text label text
+ * @return new label
* @throws NullPointerException if either argument is null
*/
public Label createLabel(final String id, final String text) {
@@ -732,8 +735,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* Creates a funnel
*
- * @param id
- * @return
+ * @param id funnel id
+ * @return new funnel
*/
public Funnel createFunnel(final String id) {
return new StandardFunnel(id.intern(), null, processScheduler);
@@ -742,9 +745,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* Creates a Port to use as an Input Port for a Process Group
*
- * @param id
- * @param name
- * @return
+ * @param id port identifier
+ * @param name port name
+ * @return new port
* @throws NullPointerException if the ID or name is not unique
* @throws IllegalStateException if an Input Port already exists with the
* same name or id.
@@ -759,9 +762,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* Creates a Port to use as an Output Port for a Process Group
*
- * @param id
- * @param name
- * @return
+ * @param id port id
+ * @param name port name
+ * @return new port
* @throws NullPointerException if the ID or name is not unique
* @throws IllegalStateException if an Input Port already exists with the
* same name or id.
@@ -776,8 +779,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* Creates a ProcessGroup with the given ID
*
- * @param id
- * @return
+ * @param id group id
+ * @return new group
* @throws NullPointerException if the argument is null
*/
public ProcessGroup createProcessGroup(final String id) {
@@ -786,13 +789,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* <p>
- * Creates a new ProcessorNode with the given type and identifier and initializes it invoking the
- * methods annotated with {@link OnAdded}.
+ * Creates a new ProcessorNode with the given type and identifier and
+ * initializes it invoking the methods annotated with {@link OnAdded}.
* </p>
*
- * @param type
- * @param id
- * @return
+ * @param type processor type
+ * @param id processor id
+ * @return new processor
* @throws NullPointerException if either arg is null
* @throws ProcessorInstantiationException if the processor cannot be
* instantiated for any reason
@@ -800,17 +803,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException {
return createProcessor(type, id, true);
}
-
+
/**
* <p>
- * Creates a new ProcessorNode with the given type and identifier and optionally initializes it.
+ * Creates a new ProcessorNode with the given type and identifier and
+ * optionally initializes it.
* </p>
*
* @param type the fully qualified Processor class name
* @param id the unique ID of the Processor
- * @param firstTimeAdded whether or not this is the first time this Processor is added to the graph. If {@code true},
- * will invoke methods annotated with the {@link OnAdded} annotation.
- * @return
+ * @param firstTimeAdded whether or not this is the first time this
+ * Processor is added to the graph. If {@code true}, will invoke methods
+ * annotated with the {@link OnAdded} annotation.
+ * @return new processor node
* @throws NullPointerException if either arg is null
* @throws ProcessorInstantiationException if the processor cannot be
* instantiated for any reason
@@ -825,7 +830,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
- if ( firstTimeAdded ) {
+ if (firstTimeAdded) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, org.apache.nifi.processor.annotation.OnAdded.class, processor);
} catch (final Exception e) {
@@ -886,9 +891,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
/**
- * Gets the BulletinRepository for storing and retrieving Bulletins.
- *
- * @return
+ * @return the BulletinRepository for storing and retrieving Bulletins
*/
public BulletinRepository getBulletinRepository() {
return bulletinRepository;
@@ -902,9 +905,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* Creates a Port to use as an Input Port for the root Process Group, which
* is used for Site-to-Site communications
*
- * @param id
- * @param name
- * @return
+ * @param id port id
+ * @param name port name
+ * @return new port
* @throws NullPointerException if the ID or name is not unique
* @throws IllegalStateException if an Input Port already exists with the
* same name or id.
@@ -913,7 +916,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
id = requireNonNull(id).intern();
name = requireNonNull(name).intern();
verifyPortIdDoesNotExist(id);
- return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
+ return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT,
+ userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
}
/**
@@ -921,9 +925,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* is used for Site-to-Site communications and will queue flow files waiting
* to be delivered to remote instances
*
- * @param id
- * @param name
- * @return
+ * @param id port id
+ * @param name port name
+ * @return new port
* @throws NullPointerException if the ID or name is not unique
* @throws IllegalStateException if an Input Port already exists with the
* same name or id.
@@ -932,17 +936,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
id = requireNonNull(id).intern();
name = requireNonNull(name).intern();
verifyPortIdDoesNotExist(id);
- return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
+ return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT,
+ userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
}
/**
* Creates a new Remote Process Group with the given ID that points to the
* given URI
*
- * @param id
- * @param uri
- * @return
- *
+ * @param id group id
+ * @param uri group uri
+ * @return new group
* @throws NullPointerException if either argument is null
* @throws IllegalArgumentException if <code>uri</code> is not a valid URI.
*/
@@ -954,8 +958,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* Verifies that no output port exists with the given id or name. If this
* does not hold true, throws an IllegalStateException
*
- * @param id
- * @throws IllegalStateException
+ * @param id port identifier
+ * @throws IllegalStateException port already exists
*/
private void verifyPortIdDoesNotExist(final String id) {
Port port = rootGroup.findOutputPort(id);
@@ -985,7 +989,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* Sets the name for the Root Group, which also changes the name for the
* controller.
*
- * @param name
+ * @param name of root group
*/
public void setName(final String name) {
readLock.lock();
@@ -997,10 +1001,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
/**
- * Gets the comments of this controller, which is also the comment of the
- * Root Group.
- *
- * @return
+ * @return the comments of this controller, which is also the comment of the
+ * Root Group
*/
public String getComments() {
readLock.lock();
@@ -1012,10 +1014,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
/**
- * Sets the comment for the Root Group, which also changes the comment for
- * the controller.
+ * Sets the comments
*
- * @param comments
+ * @param comments for the Root Group, which also changes the comment for
+ * the controller
*/
public void setComments(final String comments) {
readLock.lock();
@@ -1075,23 +1077,23 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// Trigger any processors' methods marked with @OnShutdown to be called
rootGroup.shutdown();
-
+
// invoke any methods annotated with @OnShutdown on Controller Services
- for ( final ControllerServiceNode serviceNode : getAllControllerServices() ) {
+ for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext);
}
}
-
+
// invoke any methods annotated with @OnShutdown on Reporting Tasks
- for ( final ReportingTaskNode taskNode : getAllReportingTasks() ) {
+ for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
final ConfigurationContext configContext = taskNode.getConfigurationContext();
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, taskNode.getReportingTask(), configContext);
}
}
-
+
try {
this.timerDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
this.eventDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
@@ -1108,7 +1110,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) {
LOG.info("Controller has been terminated successfully.");
} else {
- LOG.warn("Controller hasn't terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop. Might need to kill the program manually.");
+ LOG.warn("Controller hasn't terminated properly. There exists an uninterruptable thread that "
+ + "will take an indeterminate amount of time to stop. Might need to kill the program manually.");
}
if (externalSiteListener != null) {
@@ -1118,24 +1121,24 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (flowFileSwapManager != null) {
flowFileSwapManager.shutdown();
}
-
- if ( processScheduler != null ) {
- processScheduler.shutdown();
+
+ if (processScheduler != null) {
+ processScheduler.shutdown();
}
-
- if ( contentRepository != null ) {
+
+ if (contentRepository != null) {
contentRepository.shutdown();
}
-
- if ( provenanceEventRepository != null ) {
- try {
- provenanceEventRepository.close();
- } catch (final IOException ioe) {
- LOG.warn("There was a problem shutting down the Provenance Repository: " + ioe.toString());
- if ( LOG.isDebugEnabled() ) {
- LOG.warn("", ioe);
- }
- }
+
+ if (provenanceEventRepository != null) {
+ try {
+ provenanceEventRepository.close();
+ } catch (final IOException ioe) {
+ LOG.warn("There was a problem shutting down the Provenance Repository: " + ioe.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.warn("", ioe);
+ }
+ }
}
} finally {
writeLock.unlock();
@@ -1145,8 +1148,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* Serializes the current state of the controller to the given OutputStream
*
- * @param serializer
- * @param os
+ * @param serializer serializer
+ * @param os stream
* @throws FlowSerializationException if serialization of the flow fails for
* any reason
*/
@@ -1165,7 +1168,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* For more details, see
* {@link FlowSynchronizer#sync(FlowController, DataFlow)}.
*
- * @param synchronizer
+ * @param synchronizer synchronizer
* @param dataFlow the flow to load the controller with. If the flow is null
* or zero length, then the controller must not have a flow or else an
* UninheritableFlowException will be thrown.
@@ -1294,8 +1297,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* in DTO that is <code>null</code> (with the exception of the required ID)
* will be ignored.
*
- * @param dto
- * @return a fully-populated DTO representing the newly updated ProcessGroup
+ * @param dto group
* @throws ProcessorInstantiationException
*
* @throws IllegalStateException if no process group can be found with the
@@ -1331,7 +1333,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* part of the current flow. This is going create a template based on a
* snippet of this flow.
*
- * @param dto
+ * @param dto template
* @return a copy of the given DTO
* @throws IOException if an I/O error occurs when persisting the Template
* @throws NullPointerException if the DTO is null
@@ -1346,7 +1348,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* Removes all templates from this controller
*
- * @throws IOException
+ * @throws IOException ioe
*/
public void clearTemplates() throws IOException {
templateManager.clear();
@@ -1356,20 +1358,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* Imports the specified template into this controller. The contents of this
* template may have come from another NiFi instance.
*
- * @param dto
- * @return
- * @throws IOException
+ * @param dto dto
+ * @return template
+ * @throws IOException ioe
*/
public Template importTemplate(final TemplateDTO dto) throws IOException {
return templateManager.importTemplate(dto);
}
/**
- * Returns the template with the given ID, or <code>null</code> if no
- * template exists with the given ID.
- *
- * @param id
- * @return
+ * @param id identifier
+ * @return the template with the given ID, or <code>null</code> if no
+ * template exists with the given ID
*/
public Template getTemplate(final String id) {
return templateManager.getTemplate(id);
@@ -1380,9 +1380,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
/**
- * Returns all templates that this controller knows about.
- *
- * @return
+ * @return all templates that this controller knows about
*/
public Collection<Template> getTemplates() {
return templateManager.getTemplates();
@@ -1411,8 +1409,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* Creates an instance of the given snippet and adds the components to the
* given group
*
- * @param group
- * @param dto
+ * @param group group
+ * @param dto dto
*
* @throws NullPointerException if either argument is null
* @throws IllegalStateException if the snippet is not valid because a
@@ -1432,27 +1430,27 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
//
// Instantiate Controller Services
//
- for ( final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices() ) {
+ for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), true);
-
+
serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
serviceNode.setComments(controllerServiceDTO.getComments());
serviceNode.setName(controllerServiceDTO.getName());
}
-
+
// configure controller services. We do this after creating all of them in case 1 service
// references another service.
- for ( final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices() ) {
+ for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
final String serviceId = controllerServiceDTO.getId();
final ControllerServiceNode serviceNode = getControllerServiceNode(serviceId);
-
- for ( final Map.Entry<String, String> entry : controllerServiceDTO.getProperties().entrySet() ) {
- if ( entry.getValue() != null ) {
+
+ for (final Map.Entry<String, String> entry : controllerServiceDTO.getProperties().entrySet()) {
+ if (entry.getValue() != null) {
serviceNode.setProperty(entry.getKey(), entry.getValue());
}
}
}
-
+
//
// Instantiate the labels
//
@@ -1467,7 +1465,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
group.addLabel(label);
}
- //
// Instantiate the funnels
for (final FunnelDTO funnelDTO : dto.getFunnels()) {
final Funnel funnel = createFunnel(funnelDTO.getId());
@@ -1604,7 +1601,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
group.addRemoteProcessGroup(remoteGroup);
}
- //
+ //
// Instantiate ProcessGroups
//
for (final ProcessGroupDTO groupDTO : dto.getProcessGroups()) {
@@ -1640,12 +1637,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final Connectable source;
final Connectable destination;
- // locate the source and destination connectable. if this is a remote port
- // we need to locate the remote process groups. otherwise we need to
+ // locate the source and destination connectable. if this is a remote port
+ // we need to locate the remote process groups. otherwise we need to
// find the connectable given its parent group.
// NOTE: (getConnectable returns ANY connectable, when the parent is
- // not this group only input ports or output ports should be returned. if something
- // other than a port is returned, an exception will be thrown when adding the
+ // not this group only input ports or output ports should be returned. if something
+ // other than a port is returned, an exception will be thrown when adding the
// connection below.)
// see if the source connectable is a remote port
if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDTO.getType())) {
@@ -1711,8 +1708,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* Converts a set of ports into a set of remote process group ports.
*
- * @param ports
- * @return
+ * @param ports ports
+ * @return group descriptors
*/
private Set<RemoteProcessGroupPortDescriptor> convertRemotePort(final Set<RemoteProcessGroupPortDTO> ports) {
Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
@@ -1738,8 +1735,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* Returns the parent of the specified Connectable. This only considers this
* group and any direct child sub groups.
*
- * @param parentGroupId
- * @return
+ * @param parentGroupId group id
+ * @return parent group
*/
private ProcessGroup getConnectableParent(final ProcessGroup group, final String parentGroupId) {
if (areGroupsSame(group.getIdentifier(), parentGroupId)) {
@@ -1770,8 +1767,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* {@link ProcessorInstantiationException} will be thrown.
* </p>
*
- * @param group
- * @param templateContents
+ * @param group group
+ * @param templateContents contents
*/
private void validateSnippetContents(final ProcessGroup group, final FlowSnippetDTO templateContents) {
// validate the names of Input Ports
@@ -1816,7 +1813,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new IllegalStateException("Invalid Processor Type: " + proc.getType());
}
}
-
+
final Set<ControllerServiceDTO> controllerServices = templateContents.getControllerServices();
if (controllerServices != null) {
for (final ControllerServiceDTO service : controllerServices) {
@@ -1841,8 +1838,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* Recursively finds all ProcessorDTO's
*
- * @param group
- * @return
+ * @param group group
+ * @return processor dto set
*/
private Set<ProcessorDTO> findAllProcessors(final ProcessGroupDTO group) {
final Set<ProcessorDTO> procs = new HashSet<>();
@@ -1859,8 +1856,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* Recursively finds all ConnectionDTO's
*
- * @param group
- * @return
+ * @param group group
+ * @return connection dtos
*/
private Set<ConnectionDTO> findAllConnections(final ProcessGroupDTO group) {
final Set<ConnectionDTO> conns = new HashSet<>();
@@ -1879,11 +1876,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
//
/**
* Indicates whether or not the two ID's point to the same ProcessGroup. If
- * either id is null, will return <code>false</code.
+ * either id is null, will return <code>false</code>.
*
- * @param id1
- * @param id2
- * @return
+ * @param id1 group id
+ * @param id2 other group id
+ * @return true if same
*/
public boolean areGroupsSame(final String id1, final String id2) {
if (id1 == null || id2 == null) {
@@ -1999,7 +1996,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* Returns the ProcessGroup with the given ID
*
- * @param id
+ * @param id group
* @return the process group or null if not group is found
*/
private ProcessGroup lookupGroup(final String id) {
@@ -2013,7 +2010,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* Returns the ProcessGroup with the given ID
*
- * @param id
+ * @param id group id
* @return the process group or null if not group is found
*/
public ProcessGroup getGroup(final String id) {
@@ -2083,7 +2080,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
bytesSent += procStat.getBytesSent();
}
- // set status for local child groups
+ // set status for local child groups
final Collection<ProcessGroupStatus> localChildGroupStatusCollection = new ArrayList<>();
status.setProcessGroupStatus(localChildGroupStatusCollection);
for (final ProcessGroup childGroup : group.getProcessGroups()) {
@@ -2441,8 +2438,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// determine the run status and get any validation errors... must check
- // is valid when not disabled since a processors validity could change due
- // to environmental conditions (property configured with a file path and
+ // is valid when not disabled since a processors validity could change due
+ // to environmental conditions (property configured with a file path and
// the file being externally removed)
if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
status.setRunStatus(RunStatus.Disabled);
@@ -2548,17 +2545,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public ReportingTaskNode createReportingTask(final String type) throws ReportingTaskInstantiationException {
return createReportingTask(type, true);
}
-
+
public ReportingTaskNode createReportingTask(final String type, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
- return createReportingTask(type, UUID.randomUUID().toString(), firstTimeAdded);
+ return createReportingTask(type, UUID.randomUUID().toString(), firstTimeAdded);
}
-
+
@Override
public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
if (type == null || id == null) {
throw new NullPointerException();
}
-
+
ReportingTask task = null;
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
@@ -2585,8 +2582,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider);
final ReportingTaskNode taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory);
taskNode.setName(task.getClass().getSimpleName());
-
- if ( firstTimeAdded ) {
+
+ if (firstTimeAdded) {
final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask());
final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(),
SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
@@ -2596,14 +2593,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
} catch (final InitializationException ie) {
throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + type, ie);
}
-
+
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e);
}
}
-
+
reportingTasks.put(id, taskNode);
return taskNode;
}
@@ -2620,10 +2617,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
reportingTaskNode.verifyCanStart();
- processScheduler.schedule(reportingTaskNode);
+ processScheduler.schedule(reportingTaskNode);
}
-
@Override
public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
if (isTerminated()) {
@@ -2637,32 +2633,32 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
- if ( existing == null || existing != reportingTaskNode ) {
+ if (existing == null || existing != reportingTaskNode) {
throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
}
-
+
reportingTaskNode.verifyCanDelete();
-
+
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
}
-
- for ( final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet() ) {
+
+ for (final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
- if (descriptor.getControllerServiceDefinition() != null ) {
+ if (descriptor.getControllerServiceDefinition() != null) {
final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
- if ( value != null ) {
+ if (value != null) {
final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value);
- if ( serviceNode != null ) {
+ if (serviceNode != null) {
serviceNode.removeReference(reportingTaskNode);
}
}
}
}
-
+
reportingTasks.remove(reportingTaskNode.getIdentifier());
}
-
+
@Override
public Set<ReportingTaskNode> getAllReportingTasks() {
return new HashSet<>(reportingTasks.values());
@@ -2672,60 +2668,60 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
}
-
+
@Override
public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
reportingTaskNode.verifyCanEnable();
processScheduler.enableReportingTask(reportingTaskNode);
}
-
+
@Override
public void disableReportingTask(final ReportingTaskNode reportingTaskNode) {
reportingTaskNode.verifyCanDisable();
processScheduler.disableReportingTask(reportingTaskNode);
}
-
+
@Override
public void disableReferencingServices(final ControllerServiceNode serviceNode) {
controllerServiceProvider.disableReferencingServices(serviceNode);
}
-
+
@Override
public void enableReferencingServices(final ControllerServiceNode serviceNode) {
controllerServiceProvider.enableReferencingServices(serviceNode);
}
-
+
@Override
public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
controllerServiceProvider.scheduleReferencingComponents(serviceNode);
}
-
+
@Override
public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
}
-
+
@Override
public void enableControllerService(final ControllerServiceNode serviceNode) {
controllerServiceProvider.enableControllerService(serviceNode);
}
-
+
@Override
public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
controllerServiceProvider.enableControllerServices(serviceNodes);
}
-
+
@Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanDisable();
controllerServiceProvider.disableControllerService(serviceNode);
}
-
+
@Override
public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
}
-
+
@Override
public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode);
@@ -2735,12 +2731,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode);
}
-
+
@Override
public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode);
}
-
+
@Override
public ControllerService getControllerService(final String serviceIdentifier) {
return controllerServiceProvider.getControllerService(serviceIdentifier);
@@ -2765,21 +2761,22 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
return controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier);
}
-
+
@Override
public String getControllerServiceName(final String serviceIdentifier) {
- return controllerServiceProvider.getControllerServiceName(serviceIdentifier);
+ return controllerServiceProvider.getControllerServiceName(serviceIdentifier);
}
+ @Override
public void removeControllerService(final ControllerServiceNode serviceNode) {
controllerServiceProvider.removeControllerService(serviceNode);
}
-
+
@Override
public Set<ControllerServiceNode> getAllControllerServices() {
- return controllerServiceProvider.getAllControllerServices();
+ return controllerServiceProvider.getAllControllerServices();
}
-
+
//
// Counters
//
@@ -2842,7 +2839,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* Starts heartbeating to the cluster. May only be called if the instance
* was constructed for a clustered environment.
*
- * @throws IllegalStateException
+ * @throws IllegalStateException if not configured for clustering
*/
public void startHeartbeating() throws IllegalStateException {
if (!configuredForClustering) {
@@ -2893,7 +2890,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* constructed for a clustered environment. If the controller was not
* heartbeating, then this method has no effect.
*
- * @throws IllegalStateException
+ * @throws IllegalStateException if not clustered
*/
public void stopHeartbeating() throws IllegalStateException {
@@ -2925,9 +2922,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
/**
- * Returns true if the instance is heartbeating; false otherwise.
- *
- * @return
+ * @return true if the instance is heartbeating; false otherwise
*/
public boolean isHeartbeating() {
readLock.lock();
@@ -2940,9 +2935,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
/**
- * Returns the number of seconds to wait between successive heartbeats.
- *
- * @return
+ * @return the number of seconds to wait between successive heartbeats
*/
public int getHeartbeatDelaySeconds() {
readLock.lock();
@@ -2996,12 +2989,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
/**
- * Returns the DN of the Cluster Manager that we are currently connected to,
+ * @return the DN of the Cluster Manager that we are currently connected to,
* if available. This will return null if the instance is not clustered or
* if the instance is clustered but the NCM's DN is not available - for
- * instance, if cluster communications are not secure.
- *
- * @return
+ * instance, if cluster communications are not secure
*/
public String getClusterManagerDN() {
readLock.lock();
@@ -3016,7 +3007,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* Sets whether this instance is clustered. Clustered means that a node is
* either connected or trying to connect to the cluster.
*
- * @param clustered
+ * @param clustered true if clustered
* @param clusterInstanceId if clustered is true, indicates the InstanceID
* of the Cluster Manager
*/
@@ -3028,7 +3019,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* Sets whether this instance is clustered. Clustered means that a node is
* either connected or trying to connect to the cluster.
*
- * @param clustered
+ * @param clustered true if clustered
* @param clusterInstanceId if clustered is true, indicates the InstanceID
* of the Cluster Manager
* @param clusterManagerDn the DN of the NCM
@@ -3288,7 +3279,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return "Failed to determine whether or not content was available in Content Repository due to " + ioe.toString();
}
- // Make sure that the source queue exists
+ // Make sure that the source queue exists
if (event.getSourceQueueIdentifier() == null) {
return "Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue";
}
@@ -3339,7 +3330,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not contain the required Content Claim");
}
- // Make sure that the source queue exists
+ // Make sure that the source queue exists
if (event.getSourceQueueIdentifier() == null) {
throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue");
}
@@ -3358,7 +3349,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// Create the ContentClaim
- final ContentClaim claim = contentClaimManager.newContentClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
+ final ContentClaim claim = contentClaimManager.newContentClaim(event.getPreviousContentClaimContainer(),
+ event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
// Increment Claimant Count, since we will now be referencing the Content Claim
contentClaimManager.incrementClaimantCount(claim);
@@ -3544,7 +3536,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (bulletin.getGroupId() == null) {
escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
} else {
- escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
+ escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(),
+ bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
}
} else {
escapedBulletin = bulletin;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
index 85ad159..144395c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
@@ -79,39 +79,39 @@ public class FlowFromDOMFactory {
return styles;
}
-
+
public static ControllerServiceDTO getControllerService(final Element element, final StringEncryptor encryptor) {
- final ControllerServiceDTO dto = new ControllerServiceDTO();
-
- dto.setId(getString(element, "id"));
- dto.setName(getString(element, "name"));
- dto.setComments(getString(element, "comment"));
- dto.setType(getString(element, "class"));
-
- final boolean enabled = getBoolean(element, "enabled");
- dto.setState(enabled ? ControllerServiceState.ENABLED.name() : ControllerServiceState.DISABLED.name());
-
+ final ControllerServiceDTO dto = new ControllerServiceDTO();
+
+ dto.setId(getString(element, "id"));
+ dto.setName(getString(element, "name"));
+ dto.setComments(getString(element, "comment"));
+ dto.setType(getString(element, "class"));
+
+ final boolean enabled = getBoolean(element, "enabled");
+ dto.setState(enabled ? ControllerServiceState.ENABLED.name() : ControllerServiceState.DISABLED.name());
+
dto.setProperties(getProperties(element, encryptor));
dto.setAnnotationData(getString(element, "annotationData"));
return dto;
}
-
+
public static ReportingTaskDTO getReportingTask(final Element element, final StringEncryptor encryptor) {
- final ReportingTaskDTO dto = new ReportingTaskDTO();
-
- dto.setId(getString(element, "id"));
- dto.setName(getString(element, "name"));
- dto.setComments(getString(element, "comment"));
- dto.setType(getString(element, "class"));
- dto.setSchedulingPeriod(getString(element, "schedulingPeriod"));
- dto.setState(getString(element, "scheduledState"));
- dto.setSchedulingStrategy(getString(element, "schedulingStrategy"));
-
- dto.setProperties(getProperties(element, encryptor));
- dto.setAnnotationData(getString(element, "annotationData"));
-
- return dto;
+ final ReportingTaskDTO dto = new ReportingTaskDTO();
+
+ dto.setId(getString(element, "id"));
+ dto.setName(getString(element, "name"));
+ dto.setComments(getString(element, "comment"));
+ dto.setType(getString(element, "class"));
+ dto.setSchedulingPeriod(getString(element, "schedulingPeriod"));
+ dto.setState(getString(element, "scheduledState"));
+ dto.setSchedulingStrategy(getString(element, "schedulingStrategy"));
+
+ dto.setProperties(getProperties(element, encryptor));
+ dto.setAnnotationData(getString(element, "annotationData"));
+
+ return dto;
}
public static ProcessGroupDTO getProcessGroup(final String parentId, final Element element, final StringEncryptor encryptor) {
@@ -383,7 +383,7 @@ public class FlowFromDOMFactory {
}
private static LinkedHashMap<String, String> getProperties(final Element element, final StringEncryptor encryptor) {
- final LinkedHashMap<String, String> properties = new LinkedHashMap<>();
+ final LinkedHashMap<String, String> properties = new LinkedHashMap<>();
final List<Element> propertyNodeList = getChildrenByTagName(element, "property");
for (final Element propertyElement : propertyNodeList) {
final String name = getString(propertyElement, "name");
@@ -392,7 +392,7 @@ public class FlowFromDOMFactory {
}
return properties;
}
-
+
private static String getString(final Element element, final String childElementName) {
final List<Element> nodeList = getChildrenByTagName(element, childElementName);
if (nodeList == null || nodeList.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java
index 42d7f1c..7cc3039 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java
@@ -42,13 +42,13 @@ public class FlowUnmarshaller {
* Flow Configuration schema and returns a FlowSnippetDTO representing the
* flow
*
- * @param flowContents
- * @param encryptor
- * @return
+ * @param flowContents contents
+ * @param encryptor encryptor
+ * @return snippet dto
* @throws NullPointerException if <code>flowContents</code> is null
- * @throws IOException
- * @throws SAXException
- * @throws ParserConfigurationException
+ * @throws IOException ioe
+ * @throws SAXException sax
+ * @throws ParserConfigurationException pe
*/
public static FlowSnippetDTO unmarshal(final byte[] flowContents, final StringEncryptor encryptor) throws IOException, SAXException, ParserConfigurationException {
if (Objects.requireNonNull(flowContents).length == 0) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
index 7cd9d3b..c6aa395 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
@@ -80,17 +80,17 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup");
-
+
final Element controllerServicesNode = doc.createElement("controllerServices");
rootNode.appendChild(controllerServicesNode);
- for ( final ControllerServiceNode serviceNode : controller.getAllControllerServices() ) {
- addControllerService(controllerServicesNode, serviceNode, encryptor);
+ for (final ControllerServiceNode serviceNode : controller.getAllControllerServices()) {
+ addControllerService(controllerServicesNode, serviceNode, encryptor);
}
-
+
final Element reportingTasksNode = doc.createElement("reportingTasks");
rootNode.appendChild(reportingTasksNode);
- for ( final ReportingTaskNode taskNode : controller.getAllReportingTasks() ) {
- addReportingTask(reportingTasksNode, taskNode, encryptor);
+ for (final ReportingTaskNode taskNode : controller.getAllReportingTasks()) {
+ addReportingTask(reportingTasksNode, taskNode, encryptor);
}
final DOMSource domSource = new DOMSource(doc);
@@ -314,15 +314,15 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
addConfiguration(element, processor.getProperties(), processor.getAnnotationData(), encryptor);
-
+
for (final Relationship rel : processor.getAutoTerminatedRelationships()) {
addTextElement(element, "autoTerminatedRelationship", rel.getName());
}
}
-
+
private static void addConfiguration(final Element element, final Map<PropertyDescriptor, String> properties, final String annotationData, final StringEncryptor encryptor) {
- final Document doc = element.getOwnerDocument();
- for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
+ final Document doc = element.getOwnerDocument();
+ for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
String value = entry.getValue();
@@ -406,38 +406,37 @@ public class StandardFlowSerializer implements FlowSerializer {
parentElement.appendChild(element);
}
-
public static void addControllerService(final Element element, final ControllerServiceNode serviceNode, final StringEncryptor encryptor) {
- final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
- addTextElement(serviceElement, "id", serviceNode.getIdentifier());
- addTextElement(serviceElement, "name", serviceNode.getName());
- addTextElement(serviceElement, "comment", serviceNode.getComments());
- addTextElement(serviceElement, "class", serviceNode.getControllerServiceImplementation().getClass().getCanonicalName());
-
- final ControllerServiceState state = serviceNode.getState();
- final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING);
+ final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
+ addTextElement(serviceElement, "id", serviceNode.getIdentifier());
+ addTextElement(serviceElement, "name", serviceNode.getName());
+ addTextElement(serviceElement, "comment", serviceNode.getComments());
+ addTextElement(serviceElement, "class", serviceNode.getControllerServiceImplementation().getClass().getCanonicalName());
+
+ final ControllerServiceState state = serviceNode.getState();
+ final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING);
addTextElement(serviceElement, "enabled", String.valueOf(enabled));
-
+
addConfiguration(serviceElement, serviceNode.getProperties(), serviceNode.getAnnotationData(), encryptor);
-
- element.appendChild(serviceElement);
+
+ element.appendChild(serviceElement);
}
-
+
public static void addReportingTask(final Element element, final ReportingTaskNode taskNode, final StringEncryptor encryptor) {
- final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
- addTextElement(taskElement, "id", taskNode.getIdentifier());
- addTextElement(taskElement, "name", taskNode.getName());
- addTextElement(taskElement, "comment", taskNode.getComments());
- addTextElement(taskElement, "class", taskNode.getReportingTask().getClass().getCanonicalName());
+ final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
+ addTextElement(taskElement, "id", taskNode.getIdentifier());
+ addTextElement(taskElement, "name", taskNode.getName());
+ addTextElement(taskElement, "comment", taskNode.getComments());
+ addTextElement(taskElement, "class", taskNode.getReportingTask().getClass().getCanonicalName());
addTextElement(taskElement, "schedulingPeriod", taskNode.getSchedulingPeriod());
addTextElement(taskElement, "scheduledState", taskNode.getScheduledState().name());
addTextElement(taskElement, "schedulingStrategy", taskNode.getSchedulingStrategy().name());
-
- addConfiguration(taskElement, taskNode.getProperties(), taskNode.getAnnotationData(), encryptor);
-
- element.appendChild(taskElement);
+
+ addConfiguration(taskElement, taskNode.getProperties(), taskNode.getAnnotationData(), encryptor);
+
+ element.appendChild(taskElement);
}
-
+
private static void addTextElement(final Element element, final String name, final long value) {
addTextElement(element, name, String.valueOf(value));
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index fcfee83..1511293 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -338,7 +338,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
case RECONNECTION_REQUEST:
// Suspend heartbeats until we've reconnected. Otherwise,
// we may send a heartbeat while we are still in the process of
- // connecting, which will cause the Cluster Manager to mark us
+ // connecting, which will cause the Cluster Manager to mark us
// as "Connected," which becomes problematic as the FlowController's lock
// may still be held, causing this node to take a long time to respond to requests.
controller.suspendHeartbeats();
@@ -389,7 +389,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
/*
* Attempt to connect to the cluster. If the manager is able to
* provide a data flow, then the manager will send a connection
- * response. If the manager was unable to be located, then
+ * response. If the manager was unable to be located, then
* the response will be null and we should load the local dataflow
* and heartbeat until a manager is located.
*/
@@ -411,10 +411,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
controller.setConnected(false);
/*
- * Start heartbeating. Heartbeats will fail because we can't reach
- * the manager, but when we locate the manager, the node will
- * reconnect and establish a connection to the cluster. The
- * heartbeat is the trigger that will cause the manager to
+ * Start heartbeating. Heartbeats will fail because we can't reach
+ * the manager, but when we locate the manager, the node will
+ * reconnect and establish a connection to the cluster. The
+ * heartbeat is the trigger that will cause the manager to
* issue a reconnect request.
*/
controller.startHeartbeating();
@@ -515,7 +515,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
logger.info("Node reconnected.");
} catch (final Exception ex) {
- // disconnect controller
+ // disconnect controller
if (controller.isClustered()) {
disconnect();
}
@@ -618,7 +618,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// load the controller tasks
// dao.loadReportingTasks(controller);
-
// initialize the flow
controller.initializeFlow();
@@ -638,11 +637,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// send connection request to cluster manager
/*
- * Try to get a current copy of the cluster's dataflow from the manager
- * for ten times, sleeping between attempts. Ten times should be
+ * Try to get a current copy of the cluster's dataflow from the manager
+ * for ten times, sleeping between attempts. Ten times should be
* enough because the manager will register the node as connecting
* and therefore, no other changes to the cluster flow can occur.
- *
+ *
* However, the manager needs to obtain a current data flow within
* maxAttempts * tryLaterSeconds or else the node will fail to startup.
*/
@@ -813,7 +812,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
writeLock.lock();
try {
dao.save(controller, holder.shouldArchive);
- // Nulling it out if it is still set to our current SaveHolder. Otherwise leave it alone because it means
+ // Nulling it out if it is still set to our current SaveHolder. Otherwise leave it alone because it means
// another save is already pending.
final boolean noSavePending = StandardFlowService.this.saveHolder.compareAndSet(holder, null);
logger.info("Saved flow controller {} // Another save pending = {}", controller, !noSavePending);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 201482c..b66bedc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -129,7 +129,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
@Override
- public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor) throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException {
+ public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor)
+ throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException {
// get the controller's root group
final ProcessGroup rootGroup = controller.getGroup(controller.getRootGroupId());
@@ -173,20 +174,20 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final Element reportingTasksElement = (Element) DomUtils.getChild(rootElement, "reportingTasks");
final List<Element> taskElements;
- if ( reportingTasksElement == null ) {
+ if (reportingTasksElement == null) {
taskElements = Collections.emptyList();
} else {
taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
}
-
+
final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices");
final List<Element> controllerServiceElements;
- if ( controllerServicesElement == null ) {
+ if (controllerServicesElement == null) {
controllerServiceElements = Collections.emptyList();
} else {
controllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
}
-
+
logger.trace("Parsing process group from DOM");
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor);
@@ -230,14 +231,14 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
// create document by parsing proposed flow bytes
logger.trace("Parsing proposed flow bytes as DOM document");
final Document configuration = parseFlowBytes(proposedFlow.getFlow());
-
+
// attempt to sync controller with proposed flow
try {
if (configuration != null) {
synchronized (configuration) {
// get the root element
final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0);
-
+
// set controller config
logger.trace("Updating flow config");
final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount");
@@ -248,23 +249,23 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
controller.setMaxTimerDrivenThreadCount(maxThreadCount * 2 / 3);
controller.setMaxEventDrivenThreadCount(maxThreadCount / 3);
}
-
+
// get the root group XML element
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
-
+
final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices");
- if ( controllerServicesElement != null ) {
- final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
-
- if ( !initialized || existingFlowEmpty ) {
- ControllerServiceLoader.loadControllerServices(serviceElements, controller, encryptor, controller.getBulletinRepository(), autoResumeState);
- } else {
- for ( final Element serviceElement : serviceElements ) {
- updateControllerService(controller, serviceElement, encryptor);
- }
- }
+ if (controllerServicesElement != null) {
+ final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
+
+ if (!initialized || existingFlowEmpty) {
+ ControllerServiceLoader.loadControllerServices(serviceElements, controller, encryptor, controller.getBulletinRepository(), autoResumeState);
+ } else {
+ for (final Element serviceElement : serviceElements) {
+ updateControllerService(controller, serviceElement, encryptor);
+ }
+ }
}
-
+
// if this controller isn't initialized or its emtpy, add the root group, otherwise update
if (!initialized || existingFlowEmpty) {
logger.trace("Adding root process group");
@@ -273,21 +274,21 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
logger.trace("Updating root process group");
updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor);
}
-
+
final Element reportingTasksElement = (Element) DomUtils.getChild(rootElement, "reportingTasks");
- if ( reportingTasksElement != null ) {
- final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
- for ( final Element taskElement : taskElements ) {
- if ( !initialized || existingFlowEmpty ) {
- addReportingTask(controller, taskElement, encryptor);
- } else {
- updateReportingTask(controller, taskElement, encryptor);
- }
- }
+ if (reportingTasksElement != null) {
+ final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
+ for (final Element taskElement : taskElements) {
+ if (!initialized || existingFlowEmpty) {
+ addReportingTask(controller, taskElement, encryptor);
+ } else {
+ updateReportingTask(controller, taskElement, encryptor);
+ }
+ }
}
}
}
-
+
logger.trace("Synching templates");
if ((existingTemplates == null || existingTemplates.length == 0) && proposedFlow.getTemplates() != null && proposedFlow.getTemplates().length > 0) {
// need to load templates
@@ -370,105 +371,104 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
return baos.toByteArray();
}
-
-
+
private void updateControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) {
- final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
-
- final ControllerServiceState dtoState = ControllerServiceState.valueOf(dto.getState());
+ final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+
+ final ControllerServiceState dtoState = ControllerServiceState.valueOf(dto.getState());
final boolean dtoEnabled = (dtoState == ControllerServiceState.ENABLED || dtoState == ControllerServiceState.ENABLING);
-
+
final ControllerServiceNode serviceNode = controller.getControllerServiceNode(dto.getId());
final ControllerServiceState serviceState = serviceNode.getState();
final boolean serviceEnabled = (serviceState == ControllerServiceState.ENABLED || serviceState == ControllerServiceState.ENABLING);
-
- if (dtoEnabled && !serviceEnabled) {
- controller.enableControllerService(controller.getControllerServiceNode(dto.getId()));
- } else if (!dtoEnabled && serviceEnabled) {
- controller.disableControllerService(controller.getControllerServiceNode(dto.getId()));
- }
+
+ if (dtoEnabled && !serviceEnabled) {
+ controller.enableControllerService(controller.getControllerServiceNode(dto.getId()));
+ } else if (!dtoEnabled && serviceEnabled) {
+ controller.disableControllerService(controller.getControllerServiceNode(dto.getId()));
+ }
}
-
+
private void addReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) throws ReportingTaskInstantiationException {
- final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
-
- final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false);
- reportingTask.setName(dto.getName());
- reportingTask.setComments(dto.getComments());
- reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
- reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
-
- reportingTask.setAnnotationData(dto.getAnnotationData());
-
+ final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
+
+ final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false);
+ reportingTask.setName(dto.getName());
+ reportingTask.setComments(dto.getComments());
+ reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
+ reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
+
+ reportingTask.setAnnotationData(dto.getAnnotationData());
+
for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
if (entry.getValue() == null) {
- reportingTask.removeProperty(entry.getKey());
+ reportingTask.removeProperty(entry.getKey());
} else {
- reportingTask.setProperty(entry.getKey(), entry.getValue());
+ reportingTask.setProperty(entry.getKey(), entry.getValue());
}
}
-
+
final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask());
final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(),
SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller);
-
+
try {
reportingTask.getReportingTask().initialize(config);
} catch (final InitializationException ie) {
throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + dto.getType(), ie);
}
-
- if ( autoResumeState ) {
- if ( ScheduledState.RUNNING.name().equals(dto.getState()) ) {
- try {
- controller.startReportingTask(reportingTask);
- } catch (final Exception e) {
- logger.error("Failed to start {} due to {}", reportingTask, e);
- if ( logger.isDebugEnabled() ) {
- logger.error("", e);
- }
- controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
- "Reporting Tasks", Severity.ERROR.name(), "Failed to start " + reportingTask + " due to " + e));
- }
- } else if ( ScheduledState.DISABLED.name().equals(dto.getState()) ) {
- try {
- controller.disableReportingTask(reportingTask);
- } catch (final Exception e) {
- logger.error("Failed to mark {} as disabled due to {}", reportingTask, e);
- if ( logger.isDebugEnabled() ) {
- logger.error("", e);
- }
- controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
- "Reporting Tasks", Severity.ERROR.name(), "Failed to mark " + reportingTask + " as disabled due to " + e));
- }
- }
+
+ if (autoResumeState) {
+ if (ScheduledState.RUNNING.name().equals(dto.getState())) {
+ try {
+ controller.startReportingTask(reportingTask);
+ } catch (final Exception e) {
+ logger.error("Failed to start {} due to {}", reportingTask, e);
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+ "Reporting Tasks", Severity.ERROR.name(), "Failed to start " + reportingTask + " due to " + e));
+ }
+ } else if (ScheduledState.DISABLED.name().equals(dto.getState())) {
+ try {
+ controller.disableReportingTask(reportingTask);
+ } catch (final Exception e) {
+ logger.error("Failed to mark {} as disabled due to {}", reportingTask, e);
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+ "Reporting Tasks", Severity.ERROR.name(), "Failed to mark " + reportingTask + " as disabled due to " + e));
+ }
+ }
}
}
private void updateReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) {
- final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
- final ReportingTaskNode taskNode = controller.getReportingTaskNode(dto.getId());
-
+ final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
+ final ReportingTaskNode taskNode = controller.getReportingTaskNode(dto.getId());
+
if (!taskNode.getScheduledState().name().equals(dto.getState())) {
try {
switch (ScheduledState.valueOf(dto.getState())) {
case DISABLED:
- if ( taskNode.isRunning() ) {
- controller.stopReportingTask(taskNode);
- }
- controller.disableReportingTask(taskNode);
+ if (taskNode.isRunning()) {
+ controller.stopReportingTask(taskNode);
+ }
+ controller.disableReportingTask(taskNode);
break;
case RUNNING:
- if ( taskNode.getScheduledState() == ScheduledState.DISABLED ) {
- controller.enableReportingTask(taskNode);
- }
- controller.startReportingTask(taskNode);
+ if (taskNode.getScheduledState() == ScheduledState.DISABLED) {
+ controller.enableReportingTask(taskNode);
+ }
+ controller.startReportingTask(taskNode);
break;
case STOPPED:
if (taskNode.getScheduledState() == ScheduledState.DISABLED) {
- controller.enableReportingTask(taskNode);
+ controller.enableReportingTask(taskNode);
} else if (taskNode.getScheduledState() == ScheduledState.RUNNING) {
- controller.stopReportingTask(taskNode);
+ controller.stopReportingTask(taskNode);
}
break;
}
@@ -486,9 +486,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
}
}
-
-
- private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor) throws ProcessorInstantiationException {
+
+ private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor)
+ throws ProcessorInstantiationException {
// get the parent group ID
final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier();
@@ -698,7 +698,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
return new Position(dto.getX(), dto.getY());
}
- private void updateProcessor(final ProcessorNode procNode, final ProcessorDTO processorDTO, final ProcessGroup processGroup, final FlowController controller) throws ProcessorInstantiationException {
+ private void updateProcessor(final ProcessorNode procNode, final ProcessorDTO processorDTO, final ProcessGroup processGroup, final FlowController controller)
+ throws ProcessorInstantiationException {
final ProcessorConfigDTO config = processorDTO.getConfig();
procNode.setPosition(toPosition(processorDTO.getPosition()));
procNode.setName(processorDTO.getName());
@@ -747,7 +748,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
}
- private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor) throws ProcessorInstantiationException {
+ private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor)
+ throws ProcessorInstantiationException {
// get the parent group ID
final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier();
@@ -866,7 +868,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final FunnelDTO funnelDTO = FlowFromDOMFactory.getFunnel(funnelElement);
final Funnel funnel = controller.createFunnel(funnelDTO.getId());
funnel.setPosition(toPosition(funnelDTO.getPosition()));
-
+
// Since this is called during startup, we want to add the funnel without enabling it
// and then tell the controller to enable it. This way, if the controller is not fully
// initialized, the starting of the funnel is delayed until the controller is ready.
@@ -1031,7 +1033,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
* Returns true if the given controller can inherit the proposed flow
* without orphaning flow files.
*
- * @param existingFlow
+ * @param existingFlow flow
* @param controller the running controller
* @param proposedFlow the flow to inherit
*
@@ -1081,7 +1083,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
* Returns true if the given controller can inherit the proposed flow
* without orphaning flow files.
*
- * @param existingFlow
+ * @param existingFlow flow
* @param proposedFlow the flow to inherit
*
* @return null if the controller can inherit the specified flow, an