You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/10 15:43:50 UTC
[15/62] [abbrv] incubator-nifi git commit: Squashed commit of the
following:
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 14640d8..086c46b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -106,7 +106,7 @@ import org.apache.nifi.web.api.dto.PreviousValueDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.ProcessorHistoryDTO;
+import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
import org.apache.nifi.web.api.dto.PropertyHistoryDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
@@ -152,6 +152,19 @@ import org.apache.nifi.web.util.SnippetUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceReference;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.dao.ControllerServiceDAO;
+import org.apache.nifi.web.dao.ReportingTaskDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.AccessDeniedException;
@@ -162,8 +175,6 @@ import org.springframework.security.access.AccessDeniedException;
public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class);
- private static final String INVALID_REVISION_ERROR = "Given revision %s does not match current revision %s.";
- private static final String SYNC_ERROR = "This NiFi instance has been updated by '%s'. Please refresh to synchronize the view.";
// nifi core components
private ControllerFacade controllerFacade;
@@ -182,6 +193,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private PortDAO inputPortDAO;
private PortDAO outputPortDAO;
private ConnectionDAO connectionDAO;
+ private ControllerServiceDAO controllerServiceDAO;
+ private ReportingTaskDAO reportingTaskDAO;
private TemplateDAO templateDAO;
// administrative services
@@ -195,54 +208,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private NiFiProperties properties;
private DtoFactory dtoFactory;
- /**
- * Checks the specified revision against the current revision.
- *
- * @param revision The revision to check
- * @param clientId The client id
- * @return Whether or not the request should proceed
- * @throws NiFiCoreException If the specified revision is not current
- */
- private void checkRevision(Revision revision) {
-
- boolean approved = optimisticLockingManager.isCurrent(revision);
-
- if (!approved) {
- Revision currentRevision = optimisticLockingManager.getRevision();
- logger.debug("Revision check failed because current revision is " + currentRevision + " but supplied revision is " + revision);
-
- if (StringUtils.isBlank(currentRevision.getClientId()) || currentRevision.getVersion() == null) {
- throw new InvalidRevisionException(String.format(INVALID_REVISION_ERROR, revision, currentRevision));
- } else {
- throw new InvalidRevisionException(String.format(SYNC_ERROR, optimisticLockingManager.getLastModifier()));
- }
- }
- }
-
- /**
- * Increments the revision and updates the last modifier.
- *
- * @param revision
- * @return
- */
- private Revision updateRevision(Revision revision) {
- // update the client id and modifier
- final Revision updatedRevision = optimisticLockingManager.incrementRevision(revision.getClientId());
-
- // get the nifi user to extract the username
- NiFiUser user = NiFiUserUtils.getNiFiUser();
- if (user == null) {
- optimisticLockingManager.setLastModifier("unknown");
- } else {
- optimisticLockingManager.setLastModifier(user.getUserName());
- }
-
- return updatedRevision;
- }
-
// -----------------------------------------
// Verification Operations
// -----------------------------------------
+
@Override
public void verifyCreateConnection(String groupId, ConnectionDTO connectionDTO) {
connectionDAO.verifyCreate(groupId, connectionDTO);
@@ -360,100 +329,123 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
remoteProcessGroupDAO.verifyDelete(groupId, remoteProcessGroupId);
}
+ @Override
+ public void verifyUpdateControllerService(ControllerServiceDTO controllerServiceDTO) {
+ // if service does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) {
+ controllerServiceDAO.verifyUpdate(controllerServiceDTO);
+ }
+ }
+
+ @Override
+ public void verifyUpdateControllerServiceReferencingComponents(String controllerServiceId, ScheduledState scheduledState, ControllerServiceState controllerServiceState) {
+ controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
+ }
+
+ @Override
+ public void verifyDeleteControllerService(String controllerServiceId) {
+ controllerServiceDAO.verifyDelete(controllerServiceId);
+ }
+
+ @Override
+ public void verifyUpdateReportingTask(ReportingTaskDTO reportingTaskDTO) {
+ // if tasks does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) {
+ reportingTaskDAO.verifyUpdate(reportingTaskDTO);
+ }
+ }
+
+ @Override
+ public void verifyDeleteReportingTask(String reportingTaskId) {
+ reportingTaskDAO.verifyDelete(reportingTaskId);
+ }
+
// -----------------------------------------
// Write Operations
// -----------------------------------------
+
@Override
- public ConfigurationSnapshot<ConnectionDTO> updateConnection(Revision revision, String groupId, ConnectionDTO connectionDTO) {
-
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<ConnectionDTO> updateConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
// if connection does not exist, then create new connection
if (connectionDAO.hasConnection(groupId, connectionDTO.getId()) == false) {
return createConnection(revision, groupId, connectionDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ConnectionDTO>() {
+ @Override
+ public ConnectionDTO execute() {
+ final Connection connection = connectionDAO.updateConnection(groupId, connectionDTO);
- final Connection connection = connectionDAO.updateConnection(groupId, connectionDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ConnectionDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createConnectionDto(connection));
-
- // save the flow
- controllerFacade.save();
-
- return response;
+ controllerFacade.save();
+
+ return dtoFactory.createConnectionDto(connection);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ProcessorDTO> updateProcessor(Revision revision, String groupId, ProcessorDTO processorDTO) {
-
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<ProcessorDTO> updateProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
// if processor does not exist, then create new processor
if (processorDAO.hasProcessor(groupId, processorDTO.getId()) == false) {
return createProcessor(revision, groupId, processorDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
+ @Override
+ public ProcessorDTO execute() {
+ // update the processor
+ ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
- // update the processor
- ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
-
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createProcessorDto(processor);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<LabelDTO> updateLabel(Revision revision, String groupId, LabelDTO labelDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<LabelDTO> updateLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
// if label does not exist, then create new label
if (labelDAO.hasLabel(groupId, labelDTO.getId()) == false) {
return createLabel(revision, groupId, labelDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<LabelDTO>() {
+ @Override
+ public LabelDTO execute() {
+ // update the existing label
+ final Label label = labelDAO.updateLabel(groupId, labelDTO);
- // update the existing label
- final Label label = labelDAO.updateLabel(groupId, labelDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<LabelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createLabelDto(label));
-
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createLabelDto(label);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<FunnelDTO> updateFunnel(Revision revision, String groupId, FunnelDTO funnelDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<FunnelDTO> updateFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
// if label does not exist, then create new label
if (funnelDAO.hasFunnel(groupId, funnelDTO.getId()) == false) {
return createFunnel(revision, groupId, funnelDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FunnelDTO>() {
+ @Override
+ public FunnelDTO execute() {
+ // update the existing label
+ final Funnel funnel = funnelDAO.updateFunnel(groupId, funnelDTO);
- // update the existing label
- final Funnel funnel = funnelDAO.updateFunnel(groupId, funnelDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<FunnelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createFunnelDto(funnel));
-
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createFunnelDto(funnel);
+ }
+ });
}
@Override
@@ -466,141 +458,126 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public ConfigurationSnapshot<SnippetDTO> updateSnippet(Revision revision, SnippetDTO snippetDto) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<SnippetDTO> updateSnippet(final Revision revision, final SnippetDTO snippetDto) {
// if label does not exist, then create new label
if (snippetDAO.hasSnippet(snippetDto.getId()) == false) {
return createSnippet(revision, snippetDto);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<SnippetDTO>() {
+ @Override
+ public SnippetDTO execute() {
+ // update the snippet
+ final Snippet snippet = snippetDAO.updateSnippet(snippetDto);
- // update the snippet
- final Snippet snippet = snippetDAO.updateSnippet(snippetDto);
+ // build the snippet dto
+ final SnippetDTO responseSnippetDto = dtoFactory.createSnippetDto(snippet);
+ responseSnippetDto.setContents(snippetUtils.populateFlowSnippet(snippet, false));
- // build the snippet dto
- final SnippetDTO responseSnippetDto = dtoFactory.createSnippetDto(snippet);
- responseSnippetDto.setContents(snippetUtils.populateFlowSnippet(snippet, false));
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<SnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), responseSnippetDto);
-
- // save updated controller if applicable
- if (snippetDto.getParentGroupId() != null && snippet.isLinked()) {
- controllerFacade.save();
- }
-
- return response;
+ // save updated controller if applicable
+ if (snippetDto.getParentGroupId() != null && snippet.isLinked()) {
+ controllerFacade.save();
+ }
+
+ return responseSnippetDto;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<PortDTO> updateInputPort(Revision revision, String groupId, PortDTO inputPortDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<PortDTO> updateInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
// if input port does not exist, then create new input port
if (inputPortDAO.hasPort(groupId, inputPortDTO.getId()) == false) {
return createInputPort(revision, groupId, inputPortDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+ @Override
+ public PortDTO execute() {
+ final Port inputPort = inputPortDAO.updatePort(groupId, inputPortDTO);
- final Port inputPort = inputPortDAO.updatePort(groupId, inputPortDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(inputPort));
-
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createPortDto(inputPort);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<PortDTO> updateOutputPort(Revision revision, String groupId, PortDTO outputPortDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<PortDTO> updateOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
// if output port does not exist, then create new output port
if (outputPortDAO.hasPort(groupId, outputPortDTO.getId()) == false) {
return createOutputPort(revision, groupId, outputPortDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+ @Override
+ public PortDTO execute() {
+ final Port outputPort = outputPortDAO.updatePort(groupId, outputPortDTO);
- final Port outputPort = outputPortDAO.updatePort(groupId, outputPortDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(outputPort));
-
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createPortDto(outputPort);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<RemoteProcessGroupDTO> updateRemoteProcessGroup(Revision revision, String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<RemoteProcessGroupDTO> updateRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
// if controller reference does not exist, then create new controller reference
if (remoteProcessGroupDAO.hasRemoteProcessGroup(groupId, remoteProcessGroupDTO.getId()) == false) {
return createRemoteProcessGroup(revision, groupId, remoteProcessGroupDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupDTO>() {
+ @Override
+ public RemoteProcessGroupDTO execute() {
+ RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.updateRemoteProcessGroup(groupId, remoteProcessGroupDTO);
- RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.updateRemoteProcessGroup(groupId, remoteProcessGroupDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<RemoteProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
-
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupInputPort(Revision revision, String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // update the remote port
- RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<RemoteProcessGroupPortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
+ public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupInputPort(final Revision revision, final String groupId, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupPortDTO>() {
+ @Override
+ public RemoteProcessGroupPortDTO execute() {
+ // update the remote port
+ RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupOutputPort(Revision revision, String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // update the remote port
- RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<RemoteProcessGroupPortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
+ public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupOutputPort(final Revision revision, final String groupId, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupPortDTO>() {
+ @Override
+ public RemoteProcessGroupPortDTO execute() {
+ // update the remote port
+ RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ProcessGroupDTO> updateProcessGroup(Revision revision, String parentGroupId, ProcessGroupDTO processGroupDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<ProcessGroupDTO> updateProcessGroup(final Revision revision, final String parentGroupId, final ProcessGroupDTO processGroupDTO) {
// if process group does not exist, then create new process group
if (processGroupDAO.hasProcessGroup(processGroupDTO.getId()) == false) {
if (parentGroupId == null) {
@@ -609,50 +586,49 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return createProcessGroup(parentGroupId, revision, processGroupDTO);
}
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessGroupDTO>() {
+ @Override
+ public ProcessGroupDTO execute() {
+ // update the process group
+ ProcessGroup processGroup = processGroupDAO.updateProcessGroup(processGroupDTO);
- // update the process group
- ProcessGroup processGroup = processGroupDAO.updateProcessGroup(processGroupDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessGroupDto(processGroup));
-
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createProcessGroupDto(processGroup);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ControllerConfigurationDTO> updateControllerConfiguration(Revision revision, ControllerConfigurationDTO controllerConfigurationDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // update the controller configuration through the proxy
- if (controllerConfigurationDTO.getName() != null) {
- controllerFacade.setName(controllerConfigurationDTO.getName());
- }
- if (controllerConfigurationDTO.getComments() != null) {
- controllerFacade.setComments(controllerConfigurationDTO.getComments());
- }
- if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
- controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
- }
- if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
- controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
- }
-
- // create the controller configuration dto
- ControllerConfigurationDTO controllerConfig = getControllerConfiguration();
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ControllerConfigurationDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), controllerConfig);
+ public ConfigurationSnapshot<ControllerConfigurationDTO> updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerConfigurationDTO>() {
+ @Override
+ public ControllerConfigurationDTO execute() {
+ // update the controller configuration through the proxy
+ if (controllerConfigurationDTO.getName() != null) {
+ controllerFacade.setName(controllerConfigurationDTO.getName());
+ }
+ if (controllerConfigurationDTO.getComments() != null) {
+ controllerFacade.setComments(controllerConfigurationDTO.getComments());
+ }
+ if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
+ controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
+ }
+ if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
+ controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
+ }
- // save the flow
- controllerFacade.save();
+ // create the controller configuration dto
+ ControllerConfigurationDTO controllerConfig = getControllerConfiguration();
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return controllerConfig;
+ }
+ });
}
@Override
@@ -685,74 +661,66 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public ConfigurationSnapshot<Void> deleteConnection(Revision revision, String groupId, String connectionId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- connectionDAO.deleteConnection(groupId, connectionId);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+ public ConfigurationSnapshot<Void> deleteConnection(final Revision revision, final String groupId, final String connectionId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>(){
+ @Override
+ public Void execute() {
+ connectionDAO.deleteConnection(groupId, connectionId);
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteProcessor(Revision revision, String groupId, String processorId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // delete the processor and synchronize the connection state
- processorDAO.deleteProcessor(groupId, processorId);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+ public ConfigurationSnapshot<Void> deleteProcessor(final Revision revision, final String groupId, final String processorId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ // delete the processor and synchronize the connection state
+ processorDAO.deleteProcessor(groupId, processorId);
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteLabel(Revision revision, String groupId, String labelId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // delete the label
- labelDAO.deleteLabel(groupId, labelId);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
- // save the flow
- controllerFacade.save();
+ public ConfigurationSnapshot<Void> deleteLabel(final Revision revision, final String groupId, final String labelId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ // delete the label
+ labelDAO.deleteLabel(groupId, labelId);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteFunnel(Revision revision, String groupId, String funnelId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // delete the label
- funnelDAO.deleteFunnel(groupId, funnelId);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
- // save the flow
- controllerFacade.save();
+ public ConfigurationSnapshot<Void> deleteFunnel(final Revision revision, final String groupId, final String funnelId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ // delete the label
+ funnelDAO.deleteFunnel(groupId, funnelId);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
@@ -761,95 +729,85 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public ConfigurationSnapshot<Void> deleteSnippet(Revision revision, String snippetId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // determine if this snippet was linked to the data flow
- Snippet snippet = snippetDAO.getSnippet(snippetId);
- boolean linked = snippet.isLinked();
-
- // delete the snippet
- snippetDAO.deleteSnippet(snippetId);
+ public ConfigurationSnapshot<Void> deleteSnippet(final Revision revision, final String snippetId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ // determine if this snippet was linked to the data flow
+ Snippet snippet = snippetDAO.getSnippet(snippetId);
+ boolean linked = snippet.isLinked();
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+ // delete the snippet
+ snippetDAO.deleteSnippet(snippetId);
- // save the flow if necessary
- if (linked) {
- controllerFacade.save();
- }
-
- return response;
+ // save the flow if necessary
+ if (linked) {
+ controllerFacade.save();
+ }
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteInputPort(Revision revision, String groupId, String inputPortId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- inputPortDAO.deletePort(groupId, inputPortId);
+ public ConfigurationSnapshot<Void> deleteInputPort(final Revision revision, final String groupId, final String inputPortId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ inputPortDAO.deletePort(groupId, inputPortId);
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteOutputPort(Revision revision, String groupId, String outputPortId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- outputPortDAO.deletePort(groupId, outputPortId);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+ public ConfigurationSnapshot<Void> deleteOutputPort(final Revision revision, final String groupId, final String outputPortId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ outputPortDAO.deletePort(groupId, outputPortId);
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteProcessGroup(Revision revision, String groupId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- processGroupDAO.deleteProcessGroup(groupId);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+ public ConfigurationSnapshot<Void> deleteProcessGroup(final Revision revision, final String groupId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ processGroupDAO.deleteProcessGroup(groupId);
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteRemoteProcessGroup(Revision revision, String groupId, String remoteProcessGroupId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- remoteProcessGroupDAO.deleteRemoteProcessGroup(groupId, remoteProcessGroupId);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+ public ConfigurationSnapshot<Void> deleteRemoteProcessGroup(final Revision revision, final String groupId, final String remoteProcessGroupId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ remoteProcessGroupDAO.deleteRemoteProcessGroup(groupId, remoteProcessGroupId);
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
@@ -859,97 +817,86 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public ConfigurationSnapshot<ConnectionDTO> createConnection(Revision revision, String groupId, ConnectionDTO connectionDTO) {
-
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(connectionDTO.getId())) {
- connectionDTO.setId(UUID.randomUUID().toString());
- }
-
- final Connection connection = connectionDAO.createConnection(groupId, connectionDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ConnectionDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createConnectionDto(connection));
+ public ConfigurationSnapshot<ConnectionDTO> createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ConnectionDTO>() {
+ @Override
+ public ConnectionDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(connectionDTO.getId())) {
+ connectionDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ final Connection connection = connectionDAO.createConnection(groupId, connectionDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createConnectionDto(connection);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ProcessorDTO> createProcessor(Revision revision, String groupId, ProcessorDTO processorDTO) {
-
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(processorDTO.getId())) {
- processorDTO.setId(UUID.randomUUID().toString());
- }
-
- // create the processor
- final ProcessorNode processor = processorDAO.createProcessor(groupId, processorDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
+ public ConfigurationSnapshot<ProcessorDTO> createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
+ @Override
+ public ProcessorDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(processorDTO.getId())) {
+ processorDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ // create the processor
+ final ProcessorNode processor = processorDAO.createProcessor(groupId, processorDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createProcessorDto(processor);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<LabelDTO> createLabel(Revision revision, String groupId, LabelDTO labelDTO) {
-
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(labelDTO.getId())) {
- labelDTO.setId(UUID.randomUUID().toString());
- }
-
- // add the label
- final Label label = labelDAO.createLabel(groupId, labelDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<LabelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createLabelDto(label));
+ public ConfigurationSnapshot<LabelDTO> createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<LabelDTO>() {
+ @Override
+ public LabelDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(labelDTO.getId())) {
+ labelDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ // add the label
+ final Label label = labelDAO.createLabel(groupId, labelDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createLabelDto(label);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<FunnelDTO> createFunnel(Revision revision, String groupId, FunnelDTO funnelDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(funnelDTO.getId())) {
- funnelDTO.setId(UUID.randomUUID().toString());
- }
-
- // add the label
- final Funnel funnel = funnelDAO.createFunnel(groupId, funnelDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<FunnelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createFunnelDto(funnel));
+ public ConfigurationSnapshot<FunnelDTO> createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FunnelDTO>() {
+ @Override
+ public FunnelDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(funnelDTO.getId())) {
+ funnelDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ // add the label
+ final Funnel funnel = funnelDAO.createFunnel(groupId, funnelDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createFunnelDto(funnel);
+ }
+ });
}
private void validateSnippetContents(final FlowSnippetDTO flowSnippet, final String groupId) {
@@ -1008,139 +955,129 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public ConfigurationSnapshot<FlowSnippetDTO> copySnippet(Revision revision, String groupId, String snippetId, Double originX, Double originY) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(snippetId)) {
- snippetId = UUID.randomUUID().toString();
- }
-
- // create the new snippet
- FlowSnippetDTO flowSnippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY);
-
- // validate the new snippet
- validateSnippetContents(flowSnippet, groupId);
+ public ConfigurationSnapshot<FlowSnippetDTO> copySnippet(final Revision revision, final String groupId, final String snippetId, final Double originX, final Double originY) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FlowSnippetDTO>() {
+ @Override
+ public FlowSnippetDTO execute() {
+ String id = snippetId;
+
+ // ensure id is set
+ if (StringUtils.isBlank(id)) {
+ id = UUID.randomUUID().toString();
+ }
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<FlowSnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), flowSnippet);
+ // create the new snippet
+ FlowSnippetDTO flowSnippet = snippetDAO.copySnippet(groupId, id, originX, originY);
- // save the flow
- controllerFacade.save();
+ // validate the new snippet
+ validateSnippetContents(flowSnippet, groupId);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return flowSnippet;
+ }
+ });
}
@Override
public ConfigurationSnapshot<SnippetDTO> createSnippet(final Revision revision, final SnippetDTO snippetDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(snippetDTO.getId())) {
- snippetDTO.setId(UUID.randomUUID().toString());
- }
-
- // add the snippet
- final Snippet snippet = snippetDAO.createSnippet(snippetDTO);
- final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet);
- responseSnippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false));
-
- // create the response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<SnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), responseSnippetDTO);
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<SnippetDTO>() {
+ @Override
+ public SnippetDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(snippetDTO.getId())) {
+ snippetDTO.setId(UUID.randomUUID().toString());
+ }
- return response;
+ // add the snippet
+ final Snippet snippet = snippetDAO.createSnippet(snippetDTO);
+ final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet);
+ responseSnippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false));
+
+ return responseSnippetDTO;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<PortDTO> createInputPort(Revision revision, String groupId, PortDTO inputPortDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(inputPortDTO.getId())) {
- inputPortDTO.setId(UUID.randomUUID().toString());
- }
-
- final Port inputPort = inputPortDAO.createPort(groupId, inputPortDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(inputPort));
+ public ConfigurationSnapshot<PortDTO> createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+ @Override
+ public PortDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(inputPortDTO.getId())) {
+ inputPortDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ final Port inputPort = inputPortDAO.createPort(groupId, inputPortDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createPortDto(inputPort);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<PortDTO> createOutputPort(Revision revision, String groupId, PortDTO outputPortDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(outputPortDTO.getId())) {
- outputPortDTO.setId(UUID.randomUUID().toString());
- }
-
- final Port outputPort = outputPortDAO.createPort(groupId, outputPortDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(outputPort));
+ public ConfigurationSnapshot<PortDTO> createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+ @Override
+ public PortDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(outputPortDTO.getId())) {
+ outputPortDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ final Port outputPort = outputPortDAO.createPort(groupId, outputPortDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createPortDto(outputPort);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ProcessGroupDTO> createProcessGroup(String parentGroupId, Revision revision, ProcessGroupDTO processGroupDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(processGroupDTO.getId())) {
- processGroupDTO.setId(UUID.randomUUID().toString());
- }
-
- final ProcessGroup processGroup = processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessGroupDto(processGroup));
+ public ConfigurationSnapshot<ProcessGroupDTO> createProcessGroup(final String parentGroupId, final Revision revision, final ProcessGroupDTO processGroupDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessGroupDTO>() {
+ @Override
+ public ProcessGroupDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(processGroupDTO.getId())) {
+ processGroupDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ final ProcessGroup processGroup = processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createProcessGroupDto(processGroup);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<RemoteProcessGroupDTO> createRemoteProcessGroup(Revision revision, String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(remoteProcessGroupDTO.getId())) {
- remoteProcessGroupDTO.setId(UUID.randomUUID().toString());
- }
-
- final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<RemoteProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
+ public ConfigurationSnapshot<RemoteProcessGroupDTO> createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupDTO>() {
+ @Override
+ public RemoteProcessGroupDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(remoteProcessGroupDTO.getId())) {
+ remoteProcessGroupDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup);
+ }
+ });
}
@Override
@@ -1186,74 +1123,217 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public ConfigurationSnapshot<FlowSnippetDTO> createTemplateInstance(Revision revision, String groupId, Double originX, Double originY, String templateId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
+ public ConfigurationSnapshot<FlowSnippetDTO> createTemplateInstance(final Revision revision, final String groupId, final Double originX, final Double originY, final String templateId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FlowSnippetDTO>() {
+ @Override
+ public FlowSnippetDTO execute() {
+ // instantiate the template - there is no need to make another copy of the flow snippet since the actual template
+ // was copied and this dto is only used to instantiate it's components (which as already completed)
+ FlowSnippetDTO flowSnippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId);
- // instantiate the template - there is no need to make another copy of the flow snippet since the actual template
- // was copied and this dto is only used to instantiate it's components (which as already completed)
- FlowSnippetDTO flowSnippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId);
+ // validate the new snippet
+ validateSnippetContents(flowSnippet, groupId);
- // validate the new snippet
- validateSnippetContents(flowSnippet, groupId);
+ // save the flow
+ controllerFacade.save();
+
+ return flowSnippet;
+ }
+ });
+ }
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<FlowSnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), flowSnippet);
+ @Override
+ public ConfigurationSnapshot<Void> createArchive(final Revision revision) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ // create the archive
+ controllerFacade.createArchive();
+ return null;
+ }
+ });
+ }
- // save the flow
- controllerFacade.save();
+ @Override
+ public ConfigurationSnapshot<ProcessorDTO> setProcessorAnnotationData(final Revision revision, final String processorId, final String annotationData) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
+ @Override
+ public ProcessorDTO execute() {
+ // create the processor config
+ final ProcessorConfigDTO config = new ProcessorConfigDTO();
+ config.setAnnotationData(annotationData);
- return response;
+ // create the processor dto
+ final ProcessorDTO processorDTO = new ProcessorDTO();
+ processorDTO.setId(processorId);
+ processorDTO.setConfig(config);
+
+ // get the parent group id for the specified processor
+ String groupId = controllerFacade.findProcessGroupIdForProcessor(processorId);
+
+ // ensure the parent group id was found
+ if (groupId == null) {
+ throw new ResourceNotFoundException(String.format("Unable to locate Processor with id '%s'.", processorId));
+ }
+
+ // update the processor configuration
+ ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
+
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createProcessorDto(processor);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> createArchive(Revision revision) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
+ public ConfigurationSnapshot<ControllerServiceDTO> createControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerServiceDTO>() {
+ @Override
+ public ControllerServiceDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(controllerServiceDTO.getId())) {
+ controllerServiceDTO.setId(UUID.randomUUID().toString());
+ }
- // create the archive
- controllerFacade.createArchive();
+ // create the controller service
+ final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
- return response;
+ // save the update
+ if (properties.isClusterManager()) {
+ clusterManager.saveControllerServices();
+ } else {
+ controllerFacade.save();
+ }
+
+ return dtoFactory.createControllerServiceDto(controllerService);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ProcessorDTO> setProcessorAnnotationData(Revision revision, String processorId, String annotationData) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
+ public ConfigurationSnapshot<ControllerServiceDTO> updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
+ // if controller service does not exist, then create new controller service
+ if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId()) == false) {
+ return createControllerService(revision, controllerServiceDTO);
+ }
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerServiceDTO>() {
+ @Override
+ public ControllerServiceDTO execute() {
+ final ControllerServiceNode controllerService = controllerServiceDAO.updateControllerService(controllerServiceDTO);
- // create the processor config
- final ProcessorConfigDTO config = new ProcessorConfigDTO();
- config.setAnnotationData(annotationData);
+ // save the update
+ if (properties.isClusterManager()) {
+ clusterManager.saveControllerServices();
+ } else {
+ controllerFacade.save();
+ }
- // create the processor dto
- final ProcessorDTO processorDTO = new ProcessorDTO();
- processorDTO.setId(processorId);
- processorDTO.setConfig(config);
+ return dtoFactory.createControllerServiceDto(controllerService);
+ }
+ });
+ }
- // get the parent group id for the specified processor
- String groupId = controllerFacade.findProcessGroupIdForProcessor(processorId);
+ @Override
+ public ConfigurationSnapshot<Set<ControllerServiceReferencingComponentDTO>> updateControllerServiceReferencingComponents(final Revision revision, final String controllerServiceId, final org.apache.nifi.controller.ScheduledState scheduledState, final org.apache.nifi.controller.service.ControllerServiceState controllerServiceState) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Set<ControllerServiceReferencingComponentDTO>>() {
+ @Override
+ public Set<ControllerServiceReferencingComponentDTO> execute() {
+ final ControllerServiceReference reference = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
+ return dtoFactory.createControllerServiceReferencingComponentsDto(reference);
+ }
+ });
+ }
- // ensure the parent group id was found
- if (groupId == null) {
- throw new ResourceNotFoundException(String.format("Unable to locate Processor with id '%s'.", processorId));
+ @Override
+ public ConfigurationSnapshot<Void> deleteControllerService(final Revision revision, final String controllerServiceId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ // delete the label
+ controllerServiceDAO.deleteControllerService(controllerServiceId);
+
+ // save the update
+ if (properties.isClusterManager()) {
+ clusterManager.saveControllerServices();
+ } else {
+ controllerFacade.save();
+ }
+
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public ConfigurationSnapshot<ReportingTaskDTO> createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ReportingTaskDTO>() {
+ @Override
+ public ReportingTaskDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(reportingTaskDTO.getId())) {
+ reportingTaskDTO.setId(UUID.randomUUID().toString());
+ }
+
+ // create the reporting
+ final ReportingTaskNode reportingTask = reportingTaskDAO.createReportingTask(reportingTaskDTO);
+
+ // save the update
+ if (properties.isClusterManager()) {
+ clusterManager.saveReportingTasks();
+ } else {
+ controllerFacade.save();
+ }
+
+ return dtoFactory.createReportingTaskDto(reportingTask);
+ }
+ });
+ }
+
+ @Override
+ public ConfigurationSnapshot<ReportingTaskDTO> updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
+ // if reporting task does not exist, then create new reporting task
+ if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId()) == false) {
+ return createReportingTask(revision, reportingTaskDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ReportingTaskDTO>() {
+ @Override
+ public ReportingTaskDTO execute() {
+ final ReportingTaskNode reportingTask = reportingTaskDAO.updateReportingTask(reportingTaskDTO);
- // update the processor configuration
- ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
+ // save the update
+ if (properties.isClusterManager()) {
+ clusterManager.saveReportingTasks();
+ } else {
+ controllerFacade.save();
+ }
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
+ return dtoFactory.createReportingTaskDto(reportingTask);
+ }
+ });
+ }
- // save the flow
- controllerFacade.save();
+ @Override
+ public ConfigurationSnapshot<Void> deleteReportingTask(final Revision revision, final String reportingTaskId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ // delete the label
+ reportingTaskDAO.deleteReportingTask(reportingTaskId);
- return response;
+ // save the update
+ if (properties.isClusterManager()) {
+ clusterManager.saveReportingTasks();
+ } else {
+ controllerFacade.save();
+ }
+
+ return null;
+ }
+ });
}
@Override
@@ -1408,9 +1488,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// -----------------------------------------
// Read Operations
// -----------------------------------------
+
@Override
public RevisionDTO getRevision() {
- return dtoFactory.createRevisionDTO(optimisticLockingManager.getRevision());
+ return dtoFactory.createRevisionDTO(optimisticLockingManager.getLastModification());
}
@Override
@@ -1637,6 +1718,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public Set<DocumentedTypeDTO> getControllerServiceTypes() {
+ return controllerFacade.getControllerServiceTypes();
+ }
+
+ @Override
+ public Set<DocumentedTypeDTO> getReportingTaskTypes() {
+ return controllerFacade.getReportingTaskTypes();
+ }
+
+ @Override
public ProcessorDTO getProcessor(String groupId, String id) {
final ProcessorNode processor = processorDAO.getProcessor(groupId, id);
final ProcessorDTO processorDto = dtoFactory.createProcessorDto(processor);
@@ -1644,6 +1735,19 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public PropertyDescriptorDTO getProcessorPropertyDescriptor(String groupId, String id, String property) {
+ final ProcessorNode processor = processorDAO.getProcessor(groupId, id);
+ PropertyDescriptor descriptor = processor.getPropertyDescriptor(property);
+
+ // return an invalid descriptor if the processor doesn't suppor this property
+ if (descriptor == null) {
+ descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
+ }
+
+ return dtoFactory.createPropertyDescriptorDto(descriptor);
+ }
+
+ @Override
public StatusHistoryDTO getProcessorStatusHistory(String groupId, String id) {
return controllerFacade.getProcessorStatusHistory(groupId, id);
}
@@ -1823,6 +1927,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final Date now = new Date();
controllerConfig.setTimeOffset(TimeZone.getDefault().getOffset(now.getTime()));
+ controllerConfig.setCurrentTime(now);
// determine the site to site configuration
if (isClustered()) {
@@ -1929,12 +2034,72 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public ConfigurationSnapshot<ProcessGroupDTO> getProcessGroup(String groupId, final boolean recurse) {
ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
- Long version = optimisticLockingManager.getRevision().getVersion();
- ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(version, dtoFactory.createProcessGroupDto(processGroup, recurse));
+ Revision revision = optimisticLockingManager.getLastModification().getRevision();
+ ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(revision.getVersion(), dtoFactory.createProcessGroupDto(processGroup, recurse));
return response;
}
@Override
+ public Set<ControllerServiceDTO> getControllerServices() {
+ final Set<ControllerServiceDTO> controllerServiceDtos = new LinkedHashSet<>();
+ for (ControllerServiceNode controllerService : controllerServiceDAO.getControllerServices()) {
+ controllerServiceDtos.add(dtoFactory.createControllerServiceDto(controllerService));
+ }
+ return controllerServiceDtos;
+ }
+
+ @Override
+ public ControllerServiceDTO getControllerService(String controllerServiceId) {
+ return dtoFactory.createControllerServiceDto(controllerServiceDAO.getControllerService(controllerServiceId));
+ }
+
+ @Override
+ public PropertyDescriptorDTO getControllerServicePropertyDescriptor(String id, String property) {
+ final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(id);
+ PropertyDescriptor descriptor = controllerService.getControllerServiceImplementation().getPropertyDescriptor(property);
+
+ // return an invalid descriptor if the controller service doesn't support this property
+ if (descriptor == null) {
+ descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
+ }
+
+ return dtoFactory.createPropertyDescriptorDto(descriptor);
+ }
+
+ @Override
+ public Set<ControllerServiceReferencingComponentDTO> getControllerServiceReferencingComponents(String controllerServiceId) {
+ final ControllerServiceNode service = controllerServiceDAO.getControllerService(controllerServiceId);
+ return dtoFactory.createControllerServiceReferencingComponentsDto(service.getReferences());
+ }
+
+ @Override
+ public Set<ReportingTaskDTO> getReportingTasks() {
+ final Set<ReportingTaskDTO> reportingTaskDtos = new LinkedHashSet<>();
+ for (ReportingTaskNode reportingTask : reportingTaskDAO.getReportingTasks()) {
+ reportingTaskDtos.add(dtoFactory.createReportingTaskDto(reportingTask));
+ }
+ return reportingTaskDtos;
+ }
+
+ @Override
+ public ReportingTaskDTO getReportingTask(String reportingTaskId) {
+ return dtoFactory.createReportingTaskDto(reportingTaskDAO.getReportingTask(reportingTaskId));
+ }
+
+ @Override
+ public PropertyDescriptorDTO getReportingTaskPropertyDescriptor(String id, String property) {
+ final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(id);
+ PropertyDescriptor descriptor = reportingTask.getReportingTask().getPropertyDescriptor(property);
+
+ // return an invalid descriptor if the reporting task doesn't support this property
+ if (descriptor == null) {
+ descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
+ }
+
+ return dtoFactory.createPropertyDescriptorDto(descriptor);
+ }
+
+ @Override
public StatusHistoryDTO getProcessGroupStatusHistory(String groupId) {
return controllerFacade.getProcessGroupStatusHistory(groupId);
}
@@ -1974,9 +2139,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public ProcessorHistoryDTO getProcessorHistory(String processorId) {
+ public ComponentHistoryDTO getComponentHistory(String componentId) {
final Map<String, PropertyHistoryDTO> propertyHistoryDtos = new LinkedHashMap<>();
- final Map<String, List<PreviousValue>> propertyHistory = auditService.getPreviousValues(processorId);
+ final Map<String, List<PreviousValue>> propertyHistory = auditService.getPreviousValues(componentId);
for (final Map.Entry<String, List<PreviousValue>> entry : propertyHistory.entrySet()) {
final List<PreviousValueDTO> previousValueDtos = new ArrayList<>();
@@ -1996,8 +2161,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
}
- final ProcessorHistoryDTO history = new ProcessorHistoryDTO();
- history.setProcessorId(processorId);
+ final ComponentHistoryDTO history = new ComponentHistoryDTO();
+ history.setComponentId(componentId);
history.setPropertyHistory(propertyHistoryDtos);
return history;
@@ -2718,6 +2883,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
this.processGroupDAO = processGroupDAO;
}
+ public void setControllerServiceDAO(ControllerServiceDAO controllerServiceDAO) {
+ this.controllerServiceDAO = controllerServiceDAO;
+ }
+
+ public void setReportingTaskDAO(ReportingTaskDAO reportingTaskDAO) {
+ this.reportingTaskDAO = reportingTaskDAO;
+ }
+
public void setTemplateDAO(TemplateDAO templateDAO) {
this.templateDAO = templateDAO;
}