You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/05 22:00:26 UTC
[2/3] incubator-nifi git commit: NIFI-250: - Refactoring revision
checking so that we can lock appropriately on the Cluster Manager to manage
controller services running there while other concurrent requests can be
replicated amongst the cluster.
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index f953585..f56d787 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -166,8 +166,6 @@ import org.springframework.security.access.AccessDeniedException;
public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class);
- private static final String INVALID_REVISION_ERROR = "Given revision %s does not match current revision %s.";
- private static final String SYNC_ERROR = "This NiFi instance has been updated by '%s'. Please refresh to synchronize the view.";
// nifi core components
private ControllerFacade controllerFacade;
@@ -200,51 +198,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private NiFiProperties properties;
private DtoFactory dtoFactory;
- /**
- * Checks the specified revision against the current revision.
- *
- * @param revision The revision to check
- * @param clientId The client id
- * @return Whether or not the request should proceed
- * @throws NiFiCoreException If the specified revision is not current
- */
- private void checkRevision(Revision revision) {
-
- boolean approved = optimisticLockingManager.isCurrent(revision);
-
- if (!approved) {
- Revision currentRevision = optimisticLockingManager.getRevision();
- logger.debug("Revision check failed because current revision is " + currentRevision + " but supplied revision is " + revision);
-
- if (StringUtils.isBlank(currentRevision.getClientId()) || currentRevision.getVersion() == null) {
- throw new InvalidRevisionException(String.format(INVALID_REVISION_ERROR, revision, currentRevision));
- } else {
- throw new InvalidRevisionException(String.format(SYNC_ERROR, optimisticLockingManager.getLastModifier()));
- }
- }
- }
-
- /**
- * Increments the revision and updates the last modifier.
- *
- * @param revision
- * @return
- */
- private Revision updateRevision(Revision revision) {
- // update the client id and modifier
- final Revision updatedRevision = optimisticLockingManager.incrementRevision(revision.getClientId());
-
- // get the nifi user to extract the username
- NiFiUser user = NiFiUserUtils.getNiFiUser();
- if (user == null) {
- optimisticLockingManager.setLastModifier("unknown");
- } else {
- optimisticLockingManager.setLastModifier(user.getUserName());
- }
-
- return updatedRevision;
- }
-
// -----------------------------------------
// Verification Operations
// -----------------------------------------
@@ -381,96 +334,85 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// -----------------------------------------
@Override
- public ConfigurationSnapshot<ConnectionDTO> updateConnection(Revision revision, String groupId, ConnectionDTO connectionDTO) {
-
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<ConnectionDTO> updateConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
// if connection does not exist, then create new connection
if (connectionDAO.hasConnection(groupId, connectionDTO.getId()) == false) {
return createConnection(revision, groupId, connectionDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ConnectionDTO>() {
+ @Override
+ public ConnectionDTO execute() {
+ final Connection connection = connectionDAO.updateConnection(groupId, connectionDTO);
- final Connection connection = connectionDAO.updateConnection(groupId, connectionDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ConnectionDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createConnectionDto(connection));
-
- // save the flow
- controllerFacade.save();
-
- return response;
+ controllerFacade.save();
+
+ return dtoFactory.createConnectionDto(connection);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ProcessorDTO> updateProcessor(Revision revision, String groupId, ProcessorDTO processorDTO) {
-
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<ProcessorDTO> updateProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
// if processor does not exist, then create new processor
if (processorDAO.hasProcessor(groupId, processorDTO.getId()) == false) {
return createProcessor(revision, groupId, processorDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
+ @Override
+ public ProcessorDTO execute() {
+ // update the processor
+ ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
- // update the processor
- ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
-
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createProcessorDto(processor);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<LabelDTO> updateLabel(Revision revision, String groupId, LabelDTO labelDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<LabelDTO> updateLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
// if label does not exist, then create new label
if (labelDAO.hasLabel(groupId, labelDTO.getId()) == false) {
return createLabel(revision, groupId, labelDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<LabelDTO>() {
+ @Override
+ public LabelDTO execute() {
+ // update the existing label
+ final Label label = labelDAO.updateLabel(groupId, labelDTO);
- // update the existing label
- final Label label = labelDAO.updateLabel(groupId, labelDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<LabelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createLabelDto(label));
-
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createLabelDto(label);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<FunnelDTO> updateFunnel(Revision revision, String groupId, FunnelDTO funnelDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<FunnelDTO> updateFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
// if label does not exist, then create new label
if (funnelDAO.hasFunnel(groupId, funnelDTO.getId()) == false) {
return createFunnel(revision, groupId, funnelDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FunnelDTO>() {
+ @Override
+ public FunnelDTO execute() {
+ // update the existing label
+ final Funnel funnel = funnelDAO.updateFunnel(groupId, funnelDTO);
- // update the existing label
- final Funnel funnel = funnelDAO.updateFunnel(groupId, funnelDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<FunnelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createFunnelDto(funnel));
-
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createFunnelDto(funnel);
+ }
+ });
}
@Override
@@ -483,141 +425,126 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public ConfigurationSnapshot<SnippetDTO> updateSnippet(Revision revision, SnippetDTO snippetDto) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<SnippetDTO> updateSnippet(final Revision revision, final SnippetDTO snippetDto) {
// if label does not exist, then create new label
if (snippetDAO.hasSnippet(snippetDto.getId()) == false) {
return createSnippet(revision, snippetDto);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<SnippetDTO>() {
+ @Override
+ public SnippetDTO execute() {
+ // update the snippet
+ final Snippet snippet = snippetDAO.updateSnippet(snippetDto);
- // update the snippet
- final Snippet snippet = snippetDAO.updateSnippet(snippetDto);
-
- // build the snippet dto
- final SnippetDTO responseSnippetDto = dtoFactory.createSnippetDto(snippet);
- responseSnippetDto.setContents(snippetUtils.populateFlowSnippet(snippet, false));
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<SnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), responseSnippetDto);
-
- // save updated controller if applicable
- if (snippetDto.getParentGroupId() != null && snippet.isLinked()) {
- controllerFacade.save();
- }
+ // build the snippet dto
+ final SnippetDTO responseSnippetDto = dtoFactory.createSnippetDto(snippet);
+ responseSnippetDto.setContents(snippetUtils.populateFlowSnippet(snippet, false));
- return response;
+ // save updated controller if applicable
+ if (snippetDto.getParentGroupId() != null && snippet.isLinked()) {
+ controllerFacade.save();
+ }
+
+ return responseSnippetDto;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<PortDTO> updateInputPort(Revision revision, String groupId, PortDTO inputPortDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<PortDTO> updateInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
// if input port does not exist, then create new input port
if (inputPortDAO.hasPort(groupId, inputPortDTO.getId()) == false) {
return createInputPort(revision, groupId, inputPortDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+ @Override
+ public PortDTO execute() {
+ final Port inputPort = inputPortDAO.updatePort(groupId, inputPortDTO);
- final Port inputPort = inputPortDAO.updatePort(groupId, inputPortDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(inputPort));
-
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createPortDto(inputPort);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<PortDTO> updateOutputPort(Revision revision, String groupId, PortDTO outputPortDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<PortDTO> updateOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
// if output port does not exist, then create new output port
if (outputPortDAO.hasPort(groupId, outputPortDTO.getId()) == false) {
return createOutputPort(revision, groupId, outputPortDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+ @Override
+ public PortDTO execute() {
+ final Port outputPort = outputPortDAO.updatePort(groupId, outputPortDTO);
- final Port outputPort = outputPortDAO.updatePort(groupId, outputPortDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(outputPort));
-
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createPortDto(outputPort);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<RemoteProcessGroupDTO> updateRemoteProcessGroup(Revision revision, String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<RemoteProcessGroupDTO> updateRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
// if controller reference does not exist, then create new controller reference
if (remoteProcessGroupDAO.hasRemoteProcessGroup(groupId, remoteProcessGroupDTO.getId()) == false) {
return createRemoteProcessGroup(revision, groupId, remoteProcessGroupDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupDTO>() {
+ @Override
+ public RemoteProcessGroupDTO execute() {
+ RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.updateRemoteProcessGroup(groupId, remoteProcessGroupDTO);
- RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.updateRemoteProcessGroup(groupId, remoteProcessGroupDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<RemoteProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
-
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupInputPort(Revision revision, String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // update the remote port
- RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<RemoteProcessGroupPortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
+ public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupInputPort(final Revision revision, final String groupId, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupPortDTO>() {
+ @Override
+ public RemoteProcessGroupPortDTO execute() {
+ // update the remote port
+ RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupOutputPort(Revision revision, String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // update the remote port
- RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
+ public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupOutputPort(final Revision revision, final String groupId, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupPortDTO>() {
+ @Override
+ public RemoteProcessGroupPortDTO execute() {
+ // update the remote port
+ RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<RemoteProcessGroupPortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
-
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ProcessGroupDTO> updateProcessGroup(Revision revision, String parentGroupId, ProcessGroupDTO processGroupDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<ProcessGroupDTO> updateProcessGroup(final Revision revision, final String parentGroupId, final ProcessGroupDTO processGroupDTO) {
// if process group does not exist, then create new process group
if (processGroupDAO.hasProcessGroup(processGroupDTO.getId()) == false) {
if (parentGroupId == null) {
@@ -626,50 +553,49 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return createProcessGroup(parentGroupId, revision, processGroupDTO);
}
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessGroupDTO>() {
+ @Override
+ public ProcessGroupDTO execute() {
+ // update the process group
+ ProcessGroup processGroup = processGroupDAO.updateProcessGroup(processGroupDTO);
- // update the process group
- ProcessGroup processGroup = processGroupDAO.updateProcessGroup(processGroupDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessGroupDto(processGroup));
-
- // save updated controller
- controllerFacade.save();
-
- return response;
+ // save updated controller
+ controllerFacade.save();
+
+ return dtoFactory.createProcessGroupDto(processGroup);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ControllerConfigurationDTO> updateControllerConfiguration(Revision revision, ControllerConfigurationDTO controllerConfigurationDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // update the controller configuration through the proxy
- if (controllerConfigurationDTO.getName() != null) {
- controllerFacade.setName(controllerConfigurationDTO.getName());
- }
- if (controllerConfigurationDTO.getComments() != null) {
- controllerFacade.setComments(controllerConfigurationDTO.getComments());
- }
- if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
- controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
- }
- if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
- controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
- }
-
- // create the controller configuration dto
- ControllerConfigurationDTO controllerConfig = getControllerConfiguration();
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ControllerConfigurationDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), controllerConfig);
+ public ConfigurationSnapshot<ControllerConfigurationDTO> updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerConfigurationDTO>() {
+ @Override
+ public ControllerConfigurationDTO execute() {
+ // update the controller configuration through the proxy
+ if (controllerConfigurationDTO.getName() != null) {
+ controllerFacade.setName(controllerConfigurationDTO.getName());
+ }
+ if (controllerConfigurationDTO.getComments() != null) {
+ controllerFacade.setComments(controllerConfigurationDTO.getComments());
+ }
+ if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
+ controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
+ }
+ if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
+ controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
+ }
- // save the flow
- controllerFacade.save();
+ // create the controller configuration dto
+ ControllerConfigurationDTO controllerConfig = getControllerConfiguration();
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return controllerConfig;
+ }
+ });
}
@Override
@@ -702,74 +628,66 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public ConfigurationSnapshot<Void> deleteConnection(Revision revision, String groupId, String connectionId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
+ public ConfigurationSnapshot<Void> deleteConnection(final Revision revision, final String groupId, final String connectionId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>(){
+ @Override
+ public Void execute() {
+ connectionDAO.deleteConnection(groupId, connectionId);
- connectionDAO.deleteConnection(groupId, connectionId);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteProcessor(Revision revision, String groupId, String processorId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // delete the processor and synchronize the connection state
- processorDAO.deleteProcessor(groupId, processorId);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+ public ConfigurationSnapshot<Void> deleteProcessor(final Revision revision, final String groupId, final String processorId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ // delete the processor and synchronize the connection state
+ processorDAO.deleteProcessor(groupId, processorId);
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteLabel(Revision revision, String groupId, String labelId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // delete the label
- labelDAO.deleteLabel(groupId, labelId);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
- // save the flow
- controllerFacade.save();
+ public ConfigurationSnapshot<Void> deleteLabel(final Revision revision, final String groupId, final String labelId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ // delete the label
+ labelDAO.deleteLabel(groupId, labelId);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteFunnel(Revision revision, String groupId, String funnelId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // delete the label
- funnelDAO.deleteFunnel(groupId, funnelId);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
- // save the flow
- controllerFacade.save();
+ public ConfigurationSnapshot<Void> deleteFunnel(final Revision revision, final String groupId, final String funnelId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ // delete the label
+ funnelDAO.deleteFunnel(groupId, funnelId);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
@@ -778,95 +696,85 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public ConfigurationSnapshot<Void> deleteSnippet(Revision revision, String snippetId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // determine if this snippet was linked to the data flow
- Snippet snippet = snippetDAO.getSnippet(snippetId);
- boolean linked = snippet.isLinked();
+ public ConfigurationSnapshot<Void> deleteSnippet(final Revision revision, final String snippetId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ // determine if this snippet was linked to the data flow
+ Snippet snippet = snippetDAO.getSnippet(snippetId);
+ boolean linked = snippet.isLinked();
- // delete the snippet
- snippetDAO.deleteSnippet(snippetId);
+ // delete the snippet
+ snippetDAO.deleteSnippet(snippetId);
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
- // save the flow if necessary
- if (linked) {
- controllerFacade.save();
- }
-
- return response;
+ // save the flow if necessary
+ if (linked) {
+ controllerFacade.save();
+ }
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteInputPort(Revision revision, String groupId, String inputPortId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- inputPortDAO.deletePort(groupId, inputPortId);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
- // save the flow
- controllerFacade.save();
+ public ConfigurationSnapshot<Void> deleteInputPort(final Revision revision, final String groupId, final String inputPortId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ inputPortDAO.deletePort(groupId, inputPortId);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteOutputPort(Revision revision, String groupId, String outputPortId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- outputPortDAO.deletePort(groupId, outputPortId);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+ public ConfigurationSnapshot<Void> deleteOutputPort(final Revision revision, final String groupId, final String outputPortId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ outputPortDAO.deletePort(groupId, outputPortId);
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteProcessGroup(Revision revision, String groupId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- processGroupDAO.deleteProcessGroup(groupId);
+ public ConfigurationSnapshot<Void> deleteProcessGroup(final Revision revision, final String groupId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ processGroupDAO.deleteProcessGroup(groupId);
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteRemoteProcessGroup(Revision revision, String groupId, String remoteProcessGroupId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- remoteProcessGroupDAO.deleteRemoteProcessGroup(groupId, remoteProcessGroupId);
+ public ConfigurationSnapshot<Void> deleteRemoteProcessGroup(final Revision revision, final String groupId, final String remoteProcessGroupId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ remoteProcessGroupDAO.deleteRemoteProcessGroup(groupId, remoteProcessGroupId);
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return null;
+ }
+ });
}
@Override
@@ -876,97 +784,86 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public ConfigurationSnapshot<ConnectionDTO> createConnection(Revision revision, String groupId, ConnectionDTO connectionDTO) {
-
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(connectionDTO.getId())) {
- connectionDTO.setId(UUID.randomUUID().toString());
- }
-
- final Connection connection = connectionDAO.createConnection(groupId, connectionDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ConnectionDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createConnectionDto(connection));
+ public ConfigurationSnapshot<ConnectionDTO> createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ConnectionDTO>() {
+ @Override
+ public ConnectionDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(connectionDTO.getId())) {
+ connectionDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ final Connection connection = connectionDAO.createConnection(groupId, connectionDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createConnectionDto(connection);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ProcessorDTO> createProcessor(Revision revision, String groupId, ProcessorDTO processorDTO) {
-
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(processorDTO.getId())) {
- processorDTO.setId(UUID.randomUUID().toString());
- }
-
- // create the processor
- final ProcessorNode processor = processorDAO.createProcessor(groupId, processorDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
+ public ConfigurationSnapshot<ProcessorDTO> createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
+ @Override
+ public ProcessorDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(processorDTO.getId())) {
+ processorDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ // create the processor
+ final ProcessorNode processor = processorDAO.createProcessor(groupId, processorDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createProcessorDto(processor);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<LabelDTO> createLabel(Revision revision, String groupId, LabelDTO labelDTO) {
-
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(labelDTO.getId())) {
- labelDTO.setId(UUID.randomUUID().toString());
- }
-
- // add the label
- final Label label = labelDAO.createLabel(groupId, labelDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<LabelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createLabelDto(label));
+ public ConfigurationSnapshot<LabelDTO> createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<LabelDTO>() {
+ @Override
+ public LabelDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(labelDTO.getId())) {
+ labelDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ // add the label
+ final Label label = labelDAO.createLabel(groupId, labelDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createLabelDto(label);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<FunnelDTO> createFunnel(Revision revision, String groupId, FunnelDTO funnelDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(funnelDTO.getId())) {
- funnelDTO.setId(UUID.randomUUID().toString());
- }
-
- // add the label
- final Funnel funnel = funnelDAO.createFunnel(groupId, funnelDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<FunnelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createFunnelDto(funnel));
+ public ConfigurationSnapshot<FunnelDTO> createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FunnelDTO>() {
+ @Override
+ public FunnelDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(funnelDTO.getId())) {
+ funnelDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ // add the label
+ final Funnel funnel = funnelDAO.createFunnel(groupId, funnelDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createFunnelDto(funnel);
+ }
+ });
}
private void validateSnippetContents(final FlowSnippetDTO flowSnippet, final String groupId) {
@@ -1025,139 +922,129 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public ConfigurationSnapshot<FlowSnippetDTO> copySnippet(Revision revision, String groupId, String snippetId, Double originX, Double originY) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(snippetId)) {
- snippetId = UUID.randomUUID().toString();
- }
-
- // create the new snippet
- FlowSnippetDTO flowSnippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY);
-
- // validate the new snippet
- validateSnippetContents(flowSnippet, groupId);
+ public ConfigurationSnapshot<FlowSnippetDTO> copySnippet(final Revision revision, final String groupId, final String snippetId, final Double originX, final Double originY) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FlowSnippetDTO>() {
+ @Override
+ public FlowSnippetDTO execute() {
+ String id = snippetId;
+
+ // ensure id is set
+ if (StringUtils.isBlank(id)) {
+ id = UUID.randomUUID().toString();
+ }
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<FlowSnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), flowSnippet);
+ // create the new snippet
+ FlowSnippetDTO flowSnippet = snippetDAO.copySnippet(groupId, id, originX, originY);
- // save the flow
- controllerFacade.save();
+ // validate the new snippet
+ validateSnippetContents(flowSnippet, groupId);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return flowSnippet;
+ }
+ });
}
@Override
public ConfigurationSnapshot<SnippetDTO> createSnippet(final Revision revision, final SnippetDTO snippetDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(snippetDTO.getId())) {
- snippetDTO.setId(UUID.randomUUID().toString());
- }
-
- // add the snippet
- final Snippet snippet = snippetDAO.createSnippet(snippetDTO);
- final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet);
- responseSnippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false));
-
- // create the response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<SnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), responseSnippetDTO);
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<SnippetDTO>() {
+ @Override
+ public SnippetDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(snippetDTO.getId())) {
+ snippetDTO.setId(UUID.randomUUID().toString());
+ }
- return response;
+ // add the snippet
+ final Snippet snippet = snippetDAO.createSnippet(snippetDTO);
+ final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet);
+ responseSnippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false));
+
+ return responseSnippetDTO;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<PortDTO> createInputPort(Revision revision, String groupId, PortDTO inputPortDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(inputPortDTO.getId())) {
- inputPortDTO.setId(UUID.randomUUID().toString());
- }
-
- final Port inputPort = inputPortDAO.createPort(groupId, inputPortDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(inputPort));
+ public ConfigurationSnapshot<PortDTO> createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+ @Override
+ public PortDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(inputPortDTO.getId())) {
+ inputPortDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ final Port inputPort = inputPortDAO.createPort(groupId, inputPortDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createPortDto(inputPort);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<PortDTO> createOutputPort(Revision revision, String groupId, PortDTO outputPortDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(outputPortDTO.getId())) {
- outputPortDTO.setId(UUID.randomUUID().toString());
- }
-
- final Port outputPort = outputPortDAO.createPort(groupId, outputPortDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(outputPort));
+ public ConfigurationSnapshot<PortDTO> createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+ @Override
+ public PortDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(outputPortDTO.getId())) {
+ outputPortDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ final Port outputPort = outputPortDAO.createPort(groupId, outputPortDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createPortDto(outputPort);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ProcessGroupDTO> createProcessGroup(String parentGroupId, Revision revision, ProcessGroupDTO processGroupDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(processGroupDTO.getId())) {
- processGroupDTO.setId(UUID.randomUUID().toString());
- }
-
- final ProcessGroup processGroup = processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessGroupDto(processGroup));
+ public ConfigurationSnapshot<ProcessGroupDTO> createProcessGroup(final String parentGroupId, final Revision revision, final ProcessGroupDTO processGroupDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessGroupDTO>() {
+ @Override
+ public ProcessGroupDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(processGroupDTO.getId())) {
+ processGroupDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ final ProcessGroup processGroup = processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createProcessGroupDto(processGroup);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<RemoteProcessGroupDTO> createRemoteProcessGroup(Revision revision, String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(remoteProcessGroupDTO.getId())) {
- remoteProcessGroupDTO.setId(UUID.randomUUID().toString());
- }
-
- final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<RemoteProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
+ public ConfigurationSnapshot<RemoteProcessGroupDTO> createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupDTO>() {
+ @Override
+ public RemoteProcessGroupDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(remoteProcessGroupDTO.getId())) {
+ remoteProcessGroupDTO.setId(UUID.randomUUID().toString());
+ }
- // save the flow
- controllerFacade.save();
+ final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO);
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup);
+ }
+ });
}
@Override
@@ -1203,148 +1090,137 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public ConfigurationSnapshot<FlowSnippetDTO> createTemplateInstance(Revision revision, String groupId, Double originX, Double originY, String templateId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // instantiate the template - there is no need to make another copy of the flow snippet since the actual template
- // was copied and this dto is only used to instantiate it's components (which as already completed)
- FlowSnippetDTO flowSnippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId);
+ public ConfigurationSnapshot<FlowSnippetDTO> createTemplateInstance(final Revision revision, final String groupId, final Double originX, final Double originY, final String templateId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FlowSnippetDTO>() {
+ @Override
+ public FlowSnippetDTO execute() {
+ // instantiate the template - there is no need to make another copy of the flow snippet since the actual template
+ // was copied and this dto is only used to instantiate it's components (which as already completed)
+ FlowSnippetDTO flowSnippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId);
- // validate the new snippet
- validateSnippetContents(flowSnippet, groupId);
+ // validate the new snippet
+ validateSnippetContents(flowSnippet, groupId);
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<FlowSnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), flowSnippet);
-
- // save the flow
- controllerFacade.save();
-
- return response;
+ // save the flow
+ controllerFacade.save();
+
+ return flowSnippet;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> createArchive(Revision revision) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // create the archive
- controllerFacade.createArchive();
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
- return response;
+ public ConfigurationSnapshot<Void> createArchive(final Revision revision) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ // create the archive
+ controllerFacade.createArchive();
+ return null;
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ProcessorDTO> setProcessorAnnotationData(Revision revision, String processorId, String annotationData) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // create the processor config
- final ProcessorConfigDTO config = new ProcessorConfigDTO();
- config.setAnnotationData(annotationData);
+ public ConfigurationSnapshot<ProcessorDTO> setProcessorAnnotationData(final Revision revision, final String processorId, final String annotationData) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
+ @Override
+ public ProcessorDTO execute() {
+ // create the processor config
+ final ProcessorConfigDTO config = new ProcessorConfigDTO();
+ config.setAnnotationData(annotationData);
- // create the processor dto
- final ProcessorDTO processorDTO = new ProcessorDTO();
- processorDTO.setId(processorId);
- processorDTO.setConfig(config);
+ // create the processor dto
+ final ProcessorDTO processorDTO = new ProcessorDTO();
+ processorDTO.setId(processorId);
+ processorDTO.setConfig(config);
- // get the parent group id for the specified processor
- String groupId = controllerFacade.findProcessGroupIdForProcessor(processorId);
+ // get the parent group id for the specified processor
+ String groupId = controllerFacade.findProcessGroupIdForProcessor(processorId);
- // ensure the parent group id was found
- if (groupId == null) {
- throw new ResourceNotFoundException(String.format("Unable to locate Processor with id '%s'.", processorId));
- }
-
- // update the processor configuration
- ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
+ // ensure the parent group id was found
+ if (groupId == null) {
+ throw new ResourceNotFoundException(String.format("Unable to locate Processor with id '%s'.", processorId));
+ }
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
+ // update the processor configuration
+ ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
- // save the flow
- controllerFacade.save();
+ // save the flow
+ controllerFacade.save();
- return response;
+ return dtoFactory.createProcessorDto(processor);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ControllerServiceDTO> createControllerService(Revision revision, ControllerServiceDTO controllerServiceDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // ensure id is set
- if (StringUtils.isBlank(controllerServiceDTO.getId())) {
- controllerServiceDTO.setId(UUID.randomUUID().toString());
- }
-
- final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ControllerServiceDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createControllerServiceDto(controllerService));
+ public ConfigurationSnapshot<ControllerServiceDTO> createControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerServiceDTO>() {
+ @Override
+ public ControllerServiceDTO execute() {
+ // ensure id is set
+ if (StringUtils.isBlank(controllerServiceDTO.getId())) {
+ controllerServiceDTO.setId(UUID.randomUUID().toString());
+ }
- // save the update
- if (properties.isClusterManager()) {
- clusterManager.saveControllerServices();
- } else {
- controllerFacade.save();
- }
+ // create the controller service
+ final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
- return response;
+ // save the update
+ if (properties.isClusterManager()) {
+ clusterManager.saveControllerServices();
+ } else {
+ controllerFacade.save();
+ }
+
+ return dtoFactory.createControllerServiceDto(controllerService);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<ControllerServiceDTO> updateControllerService(Revision revision, ControllerServiceDTO controllerServiceDTO) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
+ public ConfigurationSnapshot<ControllerServiceDTO> updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
// if controller service does not exist, then create new controller service
if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId()) == false) {
return createControllerService(revision, controllerServiceDTO);
}
+
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerServiceDTO>() {
+ @Override
+ public ControllerServiceDTO execute() {
+ final ControllerServiceNode controllerService = controllerServiceDAO.updateControllerService(controllerServiceDTO);
- final ControllerServiceNode controllerService = controllerServiceDAO.updateControllerService(controllerServiceDTO);
-
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<ControllerServiceDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createControllerServiceDto(controllerService));
-
- // save the update
- if (properties.isClusterManager()) {
- clusterManager.saveControllerServices();
- } else {
- controllerFacade.save();
- }
+ // save the update
+ if (properties.isClusterManager()) {
+ clusterManager.saveControllerServices();
+ } else {
+ controllerFacade.save();
+ }
- return response;
+ return dtoFactory.createControllerServiceDto(controllerService);
+ }
+ });
}
@Override
- public ConfigurationSnapshot<Void> deleteControllerService(Revision revision, String controllerServiceId) {
- // ensure the proper revision before performing the update
- checkRevision(revision);
-
- // delete the label
- controllerServiceDAO.deleteControllerService(controllerServiceId);
+ public ConfigurationSnapshot<Void> deleteControllerService(final Revision revision, final String controllerServiceId) {
+ return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+ @Override
+ public Void execute() {
+ // delete the label
+ controllerServiceDAO.deleteControllerService(controllerServiceId);
- // update the revision and generate a response
- final Revision updatedRevision = updateRevision(revision);
- final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
- // save the update
- if (properties.isClusterManager()) {
- clusterManager.saveControllerServices();
- } else {
- controllerFacade.save();
- }
-
- return response;
+ // save the update
+ if (properties.isClusterManager()) {
+ clusterManager.saveControllerServices();
+ } else {
+ controllerFacade.save();
+ }
+
+ return null;
+ }
+ });
}
@Override
@@ -1499,9 +1375,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// -----------------------------------------
// Read Operations
// -----------------------------------------
+
@Override
public RevisionDTO getRevision() {
- return dtoFactory.createRevisionDTO(optimisticLockingManager.getRevision());
+ return dtoFactory.createRevisionDTO(optimisticLockingManager.getLastModification());
}
@Override
@@ -2031,8 +1908,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public ConfigurationSnapshot<ProcessGroupDTO> getProcessGroup(String groupId, final boolean recurse) {
ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
- Long version = optimisticLockingManager.getRevision().getVersion();
- ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(version, dtoFactory.createProcessGroupDto(processGroup, recurse));
+ Revision revision = optimisticLockingManager.getLastModification().getRevision();
+ ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(revision.getVersion(), dtoFactory.createProcessGroupDto(processGroup, recurse));
return response;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 1b9ae7d..787fffa 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -178,53 +178,55 @@ public abstract class ApplicationResource {
// get cluster context from threadlocal
ClusterContext clusterCtx = ClusterContextThreadLocal.getContext();
+ if (clusterCtx != null) {
+
+ // serialize cluster context
+ String serializedClusterContext = WebUtils.serializeObjectToHex(clusterCtx);
+ if (serializedClusterContext.length() > CLUSTER_CONTEXT_HEADER_VALUE_MAX_BYTES) {
+ /*
+ * Actions is the only field that can vary in size. If we have no
+ * actions and we exceeded the header size, then basic assumptions
+ * about the cluster context have been violated.
+ */
+ if (clusterCtx.getActions().isEmpty()) {
+ throw new IllegalStateException(
+ String.format("Serialized Cluster context size '%d' is too big for response header", serializedClusterContext.length()));
+ }
- // serialize cluster context
- String serializedClusterContext = WebUtils.serializeObjectToHex(clusterCtx);
- if (serializedClusterContext.length() > CLUSTER_CONTEXT_HEADER_VALUE_MAX_BYTES) {
- /*
- * Actions is the only field that can vary in size. If we have no
- * actions and we exceeded the header size, then basic assumptions
- * about the cluster context have been violated.
- */
- if (clusterCtx.getActions().isEmpty()) {
- throw new IllegalStateException(
- String.format("Serialized Cluster context size '%d' is too big for response header", serializedClusterContext.length()));
- }
+ // use the first action as the prototype for creating the "batch" action
+ Action prototypeAction = clusterCtx.getActions().get(0);
- // use the first action as the prototype for creating the "batch" action
- Action prototypeAction = clusterCtx.getActions().get(0);
+ // log the batched actions
+ StringBuilder loggedActions = new StringBuilder();
+ createBatchedActionLogStatement(loggedActions, clusterCtx.getActions());
+ logger.info(loggedActions.toString());
- // log the batched actions
- StringBuilder loggedActions = new StringBuilder();
- createBatchedActionLogStatement(loggedActions, clusterCtx.getActions());
- logger.info(loggedActions.toString());
+ // remove current actions and replace with batch action
+ clusterCtx.getActions().clear();
- // remove current actions and replace with batch action
- clusterCtx.getActions().clear();
+ // create the batch action
+ Action batchAction = new Action();
+ batchAction.setOperation(Operation.Batch);
- // create the batch action
- Action batchAction = new Action();
- batchAction.setOperation(Operation.Batch);
+ // copy values from prototype action
+ batchAction.setTimestamp(prototypeAction.getTimestamp());
+ batchAction.setUserDn(prototypeAction.getUserDn());
+ batchAction.setUserName(prototypeAction.getUserName());
+ batchAction.setSourceId(prototypeAction.getSourceId());
+ batchAction.setSourceName(prototypeAction.getSourceName());
+ batchAction.setSourceType(prototypeAction.getSourceType());
- // copy values from prototype action
- batchAction.setTimestamp(prototypeAction.getTimestamp());
- batchAction.setUserDn(prototypeAction.getUserDn());
- batchAction.setUserName(prototypeAction.getUserName());
- batchAction.setSourceId(prototypeAction.getSourceId());
- batchAction.setSourceName(prototypeAction.getSourceName());
- batchAction.setSourceType(prototypeAction.getSourceType());
+ // add batch action
+ clusterCtx.getActions().add(batchAction);
- // add batch action
- clusterCtx.getActions().add(batchAction);
+ // create the final serialized copy of the cluster context
+ serializedClusterContext = WebUtils.serializeObjectToHex(clusterCtx);
+ }
- // create the final serialized copy of the cluster context
- serializedClusterContext = WebUtils.serializeObjectToHex(clusterCtx);
+ // put serialized cluster context in response header
+ response.header(WebClusterManager.CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterContext);
}
- // put serialized cluster context in response header
- response.header(WebClusterManager.CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterContext);
-
return response;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
index e87f388..3a74782 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
@@ -379,7 +379,7 @@ public class ClusterResource extends ApplicationResource {
// update the revision
RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// generate the response entity
final ProcessorEntity entity = new ProcessorEntity();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index a941444..5d233f7 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -450,7 +450,7 @@ public class ConnectionResource extends ApplicationResource {
// get the updated revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// create the response entity
ConnectionEntity entity = new ConnectionEntity();
@@ -684,7 +684,7 @@ public class ConnectionResource extends ApplicationResource {
// get the updated revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// create the response entity
ConnectionEntity entity = new ConnectionEntity();
@@ -742,7 +742,7 @@ public class ConnectionResource extends ApplicationResource {
// create the revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(clientId.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// create the response entity
final ConnectionEntity entity = new ConnectionEntity();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
index a77e9ea..f8c539d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
@@ -315,7 +315,7 @@ public class ControllerResource extends ApplicationResource {
// create the revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(clientId.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// create the response entity
final ProcessGroupEntity controllerEntity = new ProcessGroupEntity();
@@ -337,11 +337,6 @@ public class ControllerResource extends ApplicationResource {
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(Entity.class)
public Response getRevision() {
- // replicate if cluster manager
- if (properties.isClusterManager()) {
- return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
- }
-
// create the current revision
final RevisionDTO revision = serviceFacade.getRevision();
@@ -607,7 +602,7 @@ public class ControllerResource extends ApplicationResource {
// get the updated revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// create the response entity
final ControllerConfigurationEntity entity = new ControllerConfigurationEntity();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
index 4b36f28..bccf218 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
@@ -281,7 +281,7 @@ public class ControllerServiceResource extends ApplicationResource {
// get the updated revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// build the response entity
final ControllerServiceEntity entity = new ControllerServiceEntity();
@@ -502,7 +502,7 @@ public class ControllerServiceResource extends ApplicationResource {
// get the updated revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// build the response entity
final ControllerServiceEntity entity = new ControllerServiceEntity();
@@ -563,7 +563,7 @@ public class ControllerServiceResource extends ApplicationResource {
// get the updated revision
final RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
- revision.setVersion(controllerResponse.getRevision());
+ revision.setVersion(controllerResponse.getVersion());
// build the response entity
final ControllerServiceEntity entity = new ControllerServiceEntity();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java
index 4406c2e..3492de2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java
@@ -244,7 +244,7 @@ public class FunnelResource extends ApplicationResource {
// get the updated revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// build the response entity
final FunnelEntity entity = new FunnelEntity();
@@ -408,7 +408,7 @@ public class FunnelResource extends ApplicationResource {
// get the updated revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// build the response entity
final FunnelEntity entity = new FunnelEntity();
@@ -465,7 +465,7 @@ public class FunnelResource extends ApplicationResource {
// get the updated revision
final RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
- revision.setVersion(controllerResponse.getRevision());
+ revision.setVersion(controllerResponse.getVersion());
// build the response entity
final FunnelEntity entity = new FunnelEntity();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
index 58c3c9e..f3a6326 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
@@ -251,7 +251,7 @@ public class InputPortResource extends ApplicationResource {
// get the updated revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// build the response entity
final InputPortEntity entity = new InputPortEntity();
@@ -446,7 +446,7 @@ public class InputPortResource extends ApplicationResource {
// get the updated revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// build the response entity
final InputPortEntity entity = new InputPortEntity();
@@ -503,7 +503,7 @@ public class InputPortResource extends ApplicationResource {
// get the updated revision
final RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
- revision.setVersion(controllerResponse.getRevision());
+ revision.setVersion(controllerResponse.getVersion());
// build the response entity
final InputPortEntity entity = new InputPortEntity();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4a8da603/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java
index 9a61cfc..6435671 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java
@@ -260,7 +260,7 @@ public class LabelResource extends ApplicationResource {
// get the updated revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// build the response entity
final LabelEntity entity = new LabelEntity();
@@ -463,7 +463,7 @@ public class LabelResource extends ApplicationResource {
// get the updated revision
final RevisionDTO updatedRevision = new RevisionDTO();
updatedRevision.setClientId(revision.getClientId());
- updatedRevision.setVersion(controllerResponse.getRevision());
+ updatedRevision.setVersion(controllerResponse.getVersion());
// build the response entity
final LabelEntity entity = new LabelEntity();
@@ -519,7 +519,7 @@ public class LabelResource extends ApplicationResource {
// get the updated revision
final RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
- revision.setVersion(controllerResponse.getRevision());
+ revision.setVersion(controllerResponse.getVersion());
// build the response entity
final LabelEntity entity = new LabelEntity();