You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/10 15:43:50 UTC

[15/62] [abbrv] incubator-nifi git commit: Squashed commit of the following:

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 14640d8..086c46b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -106,7 +106,7 @@ import org.apache.nifi.web.api.dto.PreviousValueDTO;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.ProcessorHistoryDTO;
+import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
 import org.apache.nifi.web.api.dto.PropertyHistoryDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
@@ -152,6 +152,19 @@ import org.apache.nifi.web.util.SnippetUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceReference;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.dao.ControllerServiceDAO;
+import org.apache.nifi.web.dao.ReportingTaskDAO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.security.access.AccessDeniedException;
@@ -162,8 +175,6 @@ import org.springframework.security.access.AccessDeniedException;
 public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
     private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class);
-    private static final String INVALID_REVISION_ERROR = "Given revision %s does not match current revision %s.";
-    private static final String SYNC_ERROR = "This NiFi instance has been updated by '%s'. Please refresh to synchronize the view.";
 
     // nifi core components
     private ControllerFacade controllerFacade;
@@ -182,6 +193,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     private PortDAO inputPortDAO;
     private PortDAO outputPortDAO;
     private ConnectionDAO connectionDAO;
+    private ControllerServiceDAO controllerServiceDAO;
+    private ReportingTaskDAO reportingTaskDAO;
     private TemplateDAO templateDAO;
 
     // administrative services
@@ -195,54 +208,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     private NiFiProperties properties;
     private DtoFactory dtoFactory;
 
-    /**
-     * Checks the specified revision against the current revision.
-     *
-     * @param revision The revision to check
-     * @param clientId The client id
-     * @return Whether or not the request should proceed
-     * @throws NiFiCoreException If the specified revision is not current
-     */
-    private void checkRevision(Revision revision) {
-
-        boolean approved = optimisticLockingManager.isCurrent(revision);
-
-        if (!approved) {
-            Revision currentRevision = optimisticLockingManager.getRevision();
-            logger.debug("Revision check failed because current revision is " + currentRevision + " but supplied revision is " + revision);
-
-            if (StringUtils.isBlank(currentRevision.getClientId()) || currentRevision.getVersion() == null) {
-                throw new InvalidRevisionException(String.format(INVALID_REVISION_ERROR, revision, currentRevision));
-            } else {
-                throw new InvalidRevisionException(String.format(SYNC_ERROR, optimisticLockingManager.getLastModifier()));
-            }
-        }
-    }
-
-    /**
-     * Increments the revision and updates the last modifier.
-     *
-     * @param revision
-     * @return
-     */
-    private Revision updateRevision(Revision revision) {
-        // update the client id and modifier
-        final Revision updatedRevision = optimisticLockingManager.incrementRevision(revision.getClientId());
-
-        // get the nifi user to extract the username
-        NiFiUser user = NiFiUserUtils.getNiFiUser();
-        if (user == null) {
-            optimisticLockingManager.setLastModifier("unknown");
-        } else {
-            optimisticLockingManager.setLastModifier(user.getUserName());
-        }
-
-        return updatedRevision;
-    }
-
     // -----------------------------------------
     // Verification Operations
     // -----------------------------------------
+    
     @Override
     public void verifyCreateConnection(String groupId, ConnectionDTO connectionDTO) {
         connectionDAO.verifyCreate(groupId, connectionDTO);
@@ -360,100 +329,123 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         remoteProcessGroupDAO.verifyDelete(groupId, remoteProcessGroupId);
     }
 
+    @Override
+    public void verifyUpdateControllerService(ControllerServiceDTO controllerServiceDTO) {
+        // if service does not exist, then the update request is likely creating it
+        // so we don't verify since it will fail
+        if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) {
+            controllerServiceDAO.verifyUpdate(controllerServiceDTO);
+        }
+    }
+
+    @Override
+    public void verifyUpdateControllerServiceReferencingComponents(String controllerServiceId, ScheduledState scheduledState, ControllerServiceState controllerServiceState) {
+        controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
+    }
+
+    @Override
+    public void verifyDeleteControllerService(String controllerServiceId) {
+        controllerServiceDAO.verifyDelete(controllerServiceId);
+    }
+
+    @Override
+    public void verifyUpdateReportingTask(ReportingTaskDTO reportingTaskDTO) {
+        // if tasks does not exist, then the update request is likely creating it
+        // so we don't verify since it will fail
+        if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) {
+            reportingTaskDAO.verifyUpdate(reportingTaskDTO);
+        }
+    }
+
+    @Override
+    public void verifyDeleteReportingTask(String reportingTaskId) {
+        reportingTaskDAO.verifyDelete(reportingTaskId);
+    }
+
     // -----------------------------------------
     // Write Operations
     // -----------------------------------------
+    
     @Override
-    public ConfigurationSnapshot<ConnectionDTO> updateConnection(Revision revision, String groupId, ConnectionDTO connectionDTO) {
-
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<ConnectionDTO> updateConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
         // if connection does not exist, then create new connection
         if (connectionDAO.hasConnection(groupId, connectionDTO.getId()) == false) {
             return createConnection(revision, groupId, connectionDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ConnectionDTO>() {
+            @Override
+            public ConnectionDTO execute() {
+                final Connection connection = connectionDAO.updateConnection(groupId, connectionDTO);
 
-        final Connection connection = connectionDAO.updateConnection(groupId, connectionDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ConnectionDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createConnectionDto(connection));
-
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                controllerFacade.save();
+                
+                return dtoFactory.createConnectionDto(connection);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ProcessorDTO> updateProcessor(Revision revision, String groupId, ProcessorDTO processorDTO) {
-
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<ProcessorDTO> updateProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
         // if processor does not exist, then create new processor
         if (processorDAO.hasProcessor(groupId, processorDTO.getId()) == false) {
             return createProcessor(revision, groupId, processorDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
+            @Override
+            public ProcessorDTO execute() {
+                // update the processor
+                ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
 
-        // update the processor
-        ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
-
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createProcessorDto(processor);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<LabelDTO> updateLabel(Revision revision, String groupId, LabelDTO labelDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<LabelDTO> updateLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
         // if label does not exist, then create new label
         if (labelDAO.hasLabel(groupId, labelDTO.getId()) == false) {
             return createLabel(revision, groupId, labelDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<LabelDTO>() {
+            @Override
+            public LabelDTO execute() {
+                // update the existing label
+                final Label label = labelDAO.updateLabel(groupId, labelDTO);
 
-        // update the existing label
-        final Label label = labelDAO.updateLabel(groupId, labelDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<LabelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createLabelDto(label));
-
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createLabelDto(label);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<FunnelDTO> updateFunnel(Revision revision, String groupId, FunnelDTO funnelDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<FunnelDTO> updateFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
         // if label does not exist, then create new label
         if (funnelDAO.hasFunnel(groupId, funnelDTO.getId()) == false) {
             return createFunnel(revision, groupId, funnelDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FunnelDTO>() {
+            @Override
+            public FunnelDTO execute() {
+                // update the existing label
+                final Funnel funnel = funnelDAO.updateFunnel(groupId, funnelDTO);
 
-        // update the existing label
-        final Funnel funnel = funnelDAO.updateFunnel(groupId, funnelDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<FunnelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createFunnelDto(funnel));
-
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createFunnelDto(funnel);
+            }
+        });
     }
 
     @Override
@@ -466,141 +458,126 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public ConfigurationSnapshot<SnippetDTO> updateSnippet(Revision revision, SnippetDTO snippetDto) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<SnippetDTO> updateSnippet(final Revision revision, final SnippetDTO snippetDto) {
         // if label does not exist, then create new label
         if (snippetDAO.hasSnippet(snippetDto.getId()) == false) {
             return createSnippet(revision, snippetDto);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<SnippetDTO>() {
+            @Override
+            public SnippetDTO execute() {
+                // update the snippet
+                final Snippet snippet = snippetDAO.updateSnippet(snippetDto);
 
-        // update the snippet
-        final Snippet snippet = snippetDAO.updateSnippet(snippetDto);
+                // build the snippet dto
+                final SnippetDTO responseSnippetDto = dtoFactory.createSnippetDto(snippet);
+                responseSnippetDto.setContents(snippetUtils.populateFlowSnippet(snippet, false));
 
-        // build the snippet dto
-        final SnippetDTO responseSnippetDto = dtoFactory.createSnippetDto(snippet);
-        responseSnippetDto.setContents(snippetUtils.populateFlowSnippet(snippet, false));
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<SnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), responseSnippetDto);
-
-        // save updated controller if applicable
-        if (snippetDto.getParentGroupId() != null && snippet.isLinked()) {
-            controllerFacade.save();
-        }
-
-        return response;
+                // save updated controller if applicable
+                if (snippetDto.getParentGroupId() != null && snippet.isLinked()) {
+                    controllerFacade.save();
+                }
+                
+                return responseSnippetDto;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<PortDTO> updateInputPort(Revision revision, String groupId, PortDTO inputPortDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<PortDTO> updateInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
         // if input port does not exist, then create new input port
         if (inputPortDAO.hasPort(groupId, inputPortDTO.getId()) == false) {
             return createInputPort(revision, groupId, inputPortDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+            @Override
+            public PortDTO execute() {
+                final Port inputPort = inputPortDAO.updatePort(groupId, inputPortDTO);
 
-        final Port inputPort = inputPortDAO.updatePort(groupId, inputPortDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(inputPort));
-
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return  dtoFactory.createPortDto(inputPort);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<PortDTO> updateOutputPort(Revision revision, String groupId, PortDTO outputPortDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<PortDTO> updateOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
         // if output port does not exist, then create new output port
         if (outputPortDAO.hasPort(groupId, outputPortDTO.getId()) == false) {
             return createOutputPort(revision, groupId, outputPortDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+            @Override
+            public PortDTO execute() {
+                final Port outputPort = outputPortDAO.updatePort(groupId, outputPortDTO);
 
-        final Port outputPort = outputPortDAO.updatePort(groupId, outputPortDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(outputPort));
-
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createPortDto(outputPort);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<RemoteProcessGroupDTO> updateRemoteProcessGroup(Revision revision, String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<RemoteProcessGroupDTO> updateRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
         // if controller reference does not exist, then create new controller reference
         if (remoteProcessGroupDAO.hasRemoteProcessGroup(groupId, remoteProcessGroupDTO.getId()) == false) {
             return createRemoteProcessGroup(revision, groupId, remoteProcessGroupDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupDTO>() {
+            @Override
+            public RemoteProcessGroupDTO execute() {
+                RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.updateRemoteProcessGroup(groupId, remoteProcessGroupDTO);
 
-        RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.updateRemoteProcessGroup(groupId, remoteProcessGroupDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<RemoteProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
-
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupInputPort(Revision revision, String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // update the remote port
-        RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<RemoteProcessGroupPortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
+    public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupInputPort(final Revision revision, final String groupId, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupPortDTO>() {
+            @Override
+            public RemoteProcessGroupPortDTO execute() {
+                // update the remote port
+                RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
 
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupOutputPort(Revision revision, String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // update the remote port
-        RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<RemoteProcessGroupPortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
+    public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupOutputPort(final Revision revision, final String groupId, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupPortDTO>() {
+            @Override
+            public RemoteProcessGroupPortDTO execute() {
+                // update the remote port
+                RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
 
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ProcessGroupDTO> updateProcessGroup(Revision revision, String parentGroupId, ProcessGroupDTO processGroupDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
+    public ConfigurationSnapshot<ProcessGroupDTO> updateProcessGroup(final Revision revision, final String parentGroupId, final ProcessGroupDTO processGroupDTO) {
         // if process group does not exist, then create new process group
         if (processGroupDAO.hasProcessGroup(processGroupDTO.getId()) == false) {
             if (parentGroupId == null) {
@@ -609,50 +586,49 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
                 return createProcessGroup(parentGroupId, revision, processGroupDTO);
             }
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessGroupDTO>() {
+            @Override
+            public ProcessGroupDTO execute() {
+                // update the process group
+                ProcessGroup processGroup = processGroupDAO.updateProcessGroup(processGroupDTO);
 
-        // update the process group
-        ProcessGroup processGroup = processGroupDAO.updateProcessGroup(processGroupDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessGroupDto(processGroup));
-
-        // save updated controller
-        controllerFacade.save();
-
-        return response;
+                // save updated controller
+                controllerFacade.save();
+                
+                return dtoFactory.createProcessGroupDto(processGroup);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ControllerConfigurationDTO> updateControllerConfiguration(Revision revision, ControllerConfigurationDTO controllerConfigurationDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // update the controller configuration through the proxy
-        if (controllerConfigurationDTO.getName() != null) {
-            controllerFacade.setName(controllerConfigurationDTO.getName());
-        }
-        if (controllerConfigurationDTO.getComments() != null) {
-            controllerFacade.setComments(controllerConfigurationDTO.getComments());
-        }
-        if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
-            controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
-        }
-        if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
-            controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
-        }
-
-        // create the controller configuration dto
-        ControllerConfigurationDTO controllerConfig = getControllerConfiguration();
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ControllerConfigurationDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), controllerConfig);
+    public ConfigurationSnapshot<ControllerConfigurationDTO> updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerConfigurationDTO>() {
+            @Override
+            public ControllerConfigurationDTO execute() {
+                // update the controller configuration through the proxy
+                if (controllerConfigurationDTO.getName() != null) {
+                    controllerFacade.setName(controllerConfigurationDTO.getName());
+                }
+                if (controllerConfigurationDTO.getComments() != null) {
+                    controllerFacade.setComments(controllerConfigurationDTO.getComments());
+                }
+                if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
+                    controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
+                }
+                if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
+                    controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                // create the controller configuration dto
+                ControllerConfigurationDTO controllerConfig = getControllerConfiguration();
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return controllerConfig;
+            }
+        });
     }
 
     @Override
@@ -685,74 +661,66 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteConnection(Revision revision, String groupId, String connectionId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        connectionDAO.deleteConnection(groupId, connectionId);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+    public ConfigurationSnapshot<Void> deleteConnection(final Revision revision, final String groupId, final String connectionId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>(){
+            @Override
+            public Void execute() {
+                connectionDAO.deleteConnection(groupId, connectionId);
 
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteProcessor(Revision revision, String groupId, String processorId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // delete the processor and synchronize the connection state
-        processorDAO.deleteProcessor(groupId, processorId);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+    public ConfigurationSnapshot<Void> deleteProcessor(final Revision revision, final String groupId, final String processorId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                // delete the processor and synchronize the connection state
+                processorDAO.deleteProcessor(groupId, processorId);
 
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteLabel(Revision revision, String groupId, String labelId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // delete the label
-        labelDAO.deleteLabel(groupId, labelId);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
-        // save the flow
-        controllerFacade.save();
+    public ConfigurationSnapshot<Void> deleteLabel(final Revision revision, final String groupId, final String labelId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                // delete the label
+                labelDAO.deleteLabel(groupId, labelId);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteFunnel(Revision revision, String groupId, String funnelId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // delete the label
-        funnelDAO.deleteFunnel(groupId, funnelId);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
-        // save the flow
-        controllerFacade.save();
+    public ConfigurationSnapshot<Void> deleteFunnel(final Revision revision, final String groupId, final String funnelId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                // delete the label
+                funnelDAO.deleteFunnel(groupId, funnelId);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
@@ -761,95 +729,85 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteSnippet(Revision revision, String snippetId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // determine if this snippet was linked to the data flow
-        Snippet snippet = snippetDAO.getSnippet(snippetId);
-        boolean linked = snippet.isLinked();
-
-        // delete the snippet
-        snippetDAO.deleteSnippet(snippetId);
+    public ConfigurationSnapshot<Void> deleteSnippet(final Revision revision, final String snippetId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                // determine if this snippet was linked to the data flow
+                Snippet snippet = snippetDAO.getSnippet(snippetId);
+                boolean linked = snippet.isLinked();
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+                // delete the snippet
+                snippetDAO.deleteSnippet(snippetId);
 
-        // save the flow if necessary
-        if (linked) {
-            controllerFacade.save();
-        }
-
-        return response;
+                // save the flow if necessary
+                if (linked) {
+                    controllerFacade.save();
+                }
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteInputPort(Revision revision, String groupId, String inputPortId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        inputPortDAO.deletePort(groupId, inputPortId);
+    public ConfigurationSnapshot<Void> deleteInputPort(final Revision revision, final String groupId, final String inputPortId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                inputPortDAO.deletePort(groupId, inputPortId);
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteOutputPort(Revision revision, String groupId, String outputPortId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        outputPortDAO.deletePort(groupId, outputPortId);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+    public ConfigurationSnapshot<Void> deleteOutputPort(final Revision revision, final String groupId, final String outputPortId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                outputPortDAO.deletePort(groupId, outputPortId);
 
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteProcessGroup(Revision revision, String groupId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        processGroupDAO.deleteProcessGroup(groupId);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+    public ConfigurationSnapshot<Void> deleteProcessGroup(final Revision revision, final String groupId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                processGroupDAO.deleteProcessGroup(groupId);
 
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> deleteRemoteProcessGroup(Revision revision, String groupId, String remoteProcessGroupId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        remoteProcessGroupDAO.deleteRemoteProcessGroup(groupId, remoteProcessGroupId);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+    public ConfigurationSnapshot<Void> deleteRemoteProcessGroup(final Revision revision, final String groupId, final String remoteProcessGroupId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                remoteProcessGroupDAO.deleteRemoteProcessGroup(groupId, remoteProcessGroupId);
 
-        // save the flow
-        controllerFacade.save();
-
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return null;
+            }
+        });
     }
 
     @Override
@@ -859,97 +817,86 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public ConfigurationSnapshot<ConnectionDTO> createConnection(Revision revision, String groupId, ConnectionDTO connectionDTO) {
-
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(connectionDTO.getId())) {
-            connectionDTO.setId(UUID.randomUUID().toString());
-        }
-
-        final Connection connection = connectionDAO.createConnection(groupId, connectionDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ConnectionDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createConnectionDto(connection));
+    public ConfigurationSnapshot<ConnectionDTO> createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ConnectionDTO>() {
+            @Override
+            public ConnectionDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(connectionDTO.getId())) {
+                    connectionDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                final Connection connection = connectionDAO.createConnection(groupId, connectionDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createConnectionDto(connection);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ProcessorDTO> createProcessor(Revision revision, String groupId, ProcessorDTO processorDTO) {
-
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(processorDTO.getId())) {
-            processorDTO.setId(UUID.randomUUID().toString());
-        }
-
-        // create the processor
-        final ProcessorNode processor = processorDAO.createProcessor(groupId, processorDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
+    public ConfigurationSnapshot<ProcessorDTO> createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
+            @Override
+            public ProcessorDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(processorDTO.getId())) {
+                    processorDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                // create the processor
+                final ProcessorNode processor = processorDAO.createProcessor(groupId, processorDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createProcessorDto(processor);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<LabelDTO> createLabel(Revision revision, String groupId, LabelDTO labelDTO) {
-
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(labelDTO.getId())) {
-            labelDTO.setId(UUID.randomUUID().toString());
-        }
-
-        // add the label
-        final Label label = labelDAO.createLabel(groupId, labelDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<LabelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createLabelDto(label));
+    public ConfigurationSnapshot<LabelDTO> createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<LabelDTO>() {
+            @Override
+            public LabelDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(labelDTO.getId())) {
+                    labelDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                // add the label
+                final Label label = labelDAO.createLabel(groupId, labelDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createLabelDto(label);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<FunnelDTO> createFunnel(Revision revision, String groupId, FunnelDTO funnelDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(funnelDTO.getId())) {
-            funnelDTO.setId(UUID.randomUUID().toString());
-        }
-
-        // add the label
-        final Funnel funnel = funnelDAO.createFunnel(groupId, funnelDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<FunnelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createFunnelDto(funnel));
+    public ConfigurationSnapshot<FunnelDTO> createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FunnelDTO>() {
+            @Override
+            public FunnelDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(funnelDTO.getId())) {
+                    funnelDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                // add the label
+                final Funnel funnel = funnelDAO.createFunnel(groupId, funnelDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createFunnelDto(funnel);
+            }
+        });
     }
 
     private void validateSnippetContents(final FlowSnippetDTO flowSnippet, final String groupId) {
@@ -1008,139 +955,129 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public ConfigurationSnapshot<FlowSnippetDTO> copySnippet(Revision revision, String groupId, String snippetId, Double originX, Double originY) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(snippetId)) {
-            snippetId = UUID.randomUUID().toString();
-        }
-
-        // create the new snippet
-        FlowSnippetDTO flowSnippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY);
-
-        // validate the new snippet
-        validateSnippetContents(flowSnippet, groupId);
+    public ConfigurationSnapshot<FlowSnippetDTO> copySnippet(final Revision revision, final String groupId, final String snippetId, final Double originX, final Double originY) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FlowSnippetDTO>() {
+            @Override
+            public FlowSnippetDTO execute() {
+                String id = snippetId;
+                
+                // ensure id is set
+                if (StringUtils.isBlank(id)) {
+                    id = UUID.randomUUID().toString();
+                }
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<FlowSnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), flowSnippet);
+                // create the new snippet
+                FlowSnippetDTO flowSnippet = snippetDAO.copySnippet(groupId, id, originX, originY);
 
-        // save the flow
-        controllerFacade.save();
+                // validate the new snippet
+                validateSnippetContents(flowSnippet, groupId);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return flowSnippet;
+            }
+        });
     }
 
     @Override
     public ConfigurationSnapshot<SnippetDTO> createSnippet(final Revision revision, final SnippetDTO snippetDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(snippetDTO.getId())) {
-            snippetDTO.setId(UUID.randomUUID().toString());
-        }
-
-        // add the snippet
-        final Snippet snippet = snippetDAO.createSnippet(snippetDTO);
-        final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet);
-        responseSnippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false));
-
-        // create the response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<SnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), responseSnippetDTO);
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<SnippetDTO>() {
+            @Override
+            public SnippetDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(snippetDTO.getId())) {
+                    snippetDTO.setId(UUID.randomUUID().toString());
+                }
 
-        return response;
+                // add the snippet
+                final Snippet snippet = snippetDAO.createSnippet(snippetDTO);
+                final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet);
+                responseSnippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false));
+                
+                return responseSnippetDTO;
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<PortDTO> createInputPort(Revision revision, String groupId, PortDTO inputPortDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(inputPortDTO.getId())) {
-            inputPortDTO.setId(UUID.randomUUID().toString());
-        }
-
-        final Port inputPort = inputPortDAO.createPort(groupId, inputPortDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(inputPort));
+    public ConfigurationSnapshot<PortDTO> createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+            @Override
+            public PortDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(inputPortDTO.getId())) {
+                    inputPortDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                final Port inputPort = inputPortDAO.createPort(groupId, inputPortDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createPortDto(inputPort);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<PortDTO> createOutputPort(Revision revision, String groupId, PortDTO outputPortDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(outputPortDTO.getId())) {
-            outputPortDTO.setId(UUID.randomUUID().toString());
-        }
-
-        final Port outputPort = outputPortDAO.createPort(groupId, outputPortDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(outputPort));
+    public ConfigurationSnapshot<PortDTO> createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
+            @Override
+            public PortDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(outputPortDTO.getId())) {
+                    outputPortDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                final Port outputPort = outputPortDAO.createPort(groupId, outputPortDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createPortDto(outputPort);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ProcessGroupDTO> createProcessGroup(String parentGroupId, Revision revision, ProcessGroupDTO processGroupDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(processGroupDTO.getId())) {
-            processGroupDTO.setId(UUID.randomUUID().toString());
-        }
-
-        final ProcessGroup processGroup = processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessGroupDto(processGroup));
+    public ConfigurationSnapshot<ProcessGroupDTO> createProcessGroup(final String parentGroupId, final Revision revision, final ProcessGroupDTO processGroupDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessGroupDTO>() {
+            @Override
+            public ProcessGroupDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(processGroupDTO.getId())) {
+                    processGroupDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                final ProcessGroup processGroup = processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createProcessGroupDto(processGroup);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<RemoteProcessGroupDTO> createRemoteProcessGroup(Revision revision, String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
-
-        // ensure id is set
-        if (StringUtils.isBlank(remoteProcessGroupDTO.getId())) {
-            remoteProcessGroupDTO.setId(UUID.randomUUID().toString());
-        }
-
-        final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO);
-
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<RemoteProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
+    public ConfigurationSnapshot<RemoteProcessGroupDTO> createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupDTO>() {
+            @Override
+            public RemoteProcessGroupDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(remoteProcessGroupDTO.getId())) {
+                    remoteProcessGroupDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // save the flow
-        controllerFacade.save();
+                final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO);
 
-        return response;
+                // save the flow
+                controllerFacade.save();
+                
+                return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup);
+            }
+        });
     }
 
     @Override
@@ -1186,74 +1123,217 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public ConfigurationSnapshot<FlowSnippetDTO> createTemplateInstance(Revision revision, String groupId, Double originX, Double originY, String templateId) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
+    public ConfigurationSnapshot<FlowSnippetDTO> createTemplateInstance(final Revision revision, final String groupId, final Double originX, final Double originY, final String templateId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FlowSnippetDTO>() {
+            @Override
+            public FlowSnippetDTO execute() {
+                // instantiate the template - there is no need to make another copy of the flow snippet since the actual template
+                // was copied and this dto is only used to instantiate it's components (which as already completed)
+                FlowSnippetDTO flowSnippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId);
 
-        // instantiate the template - there is no need to make another copy of the flow snippet since the actual template
-        // was copied and this dto is only used to instantiate it's components (which as already completed)
-        FlowSnippetDTO flowSnippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId);
+                // validate the new snippet
+                validateSnippetContents(flowSnippet, groupId);
 
-        // validate the new snippet
-        validateSnippetContents(flowSnippet, groupId);
+                // save the flow
+                controllerFacade.save();
+                
+                return flowSnippet;
+            }
+        });
+    }
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<FlowSnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), flowSnippet);
+    @Override
+    public ConfigurationSnapshot<Void> createArchive(final Revision revision) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                // create the archive
+                controllerFacade.createArchive();
+                return null;
+            }
+        });
+    }
 
-        // save the flow
-        controllerFacade.save();
+    @Override
+    public ConfigurationSnapshot<ProcessorDTO> setProcessorAnnotationData(final Revision revision, final String processorId, final String annotationData) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
+            @Override
+            public ProcessorDTO execute() {
+                // create the processor config
+                final ProcessorConfigDTO config = new ProcessorConfigDTO();
+                config.setAnnotationData(annotationData);
 
-        return response;
+                // create the processor dto
+                final ProcessorDTO processorDTO = new ProcessorDTO();
+                processorDTO.setId(processorId);
+                processorDTO.setConfig(config);
+
+                // get the parent group id for the specified processor
+                String groupId = controllerFacade.findProcessGroupIdForProcessor(processorId);
+
+                // ensure the parent group id was found
+                if (groupId == null) {
+                    throw new ResourceNotFoundException(String.format("Unable to locate Processor with id '%s'.", processorId));
+                }
+
+                // update the processor configuration
+                ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
+
+                // save the flow
+                controllerFacade.save();
+
+                return dtoFactory.createProcessorDto(processor);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<Void> createArchive(Revision revision) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
+    public ConfigurationSnapshot<ControllerServiceDTO> createControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerServiceDTO>() {
+            @Override
+            public ControllerServiceDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(controllerServiceDTO.getId())) {
+                    controllerServiceDTO.setId(UUID.randomUUID().toString());
+                }
 
-        // create the archive
-        controllerFacade.createArchive();
+                // create the controller service
+                final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
-        return response;
+                // save the update
+                if (properties.isClusterManager()) {
+                    clusterManager.saveControllerServices();
+                } else {
+                    controllerFacade.save();
+                }
+                
+                return dtoFactory.createControllerServiceDto(controllerService);
+            }
+        });
     }
 
     @Override
-    public ConfigurationSnapshot<ProcessorDTO> setProcessorAnnotationData(Revision revision, String processorId, String annotationData) {
-        // ensure the proper revision before performing the update
-        checkRevision(revision);
+    public ConfigurationSnapshot<ControllerServiceDTO> updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
+        // if controller service does not exist, then create new controller service
+        if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId()) == false) {
+            return createControllerService(revision, controllerServiceDTO);
+        }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerServiceDTO>() {
+            @Override
+            public ControllerServiceDTO execute() {
+                final ControllerServiceNode controllerService = controllerServiceDAO.updateControllerService(controllerServiceDTO);
 
-        // create the processor config
-        final ProcessorConfigDTO config = new ProcessorConfigDTO();
-        config.setAnnotationData(annotationData);
+                // save the update
+                if (properties.isClusterManager()) {
+                    clusterManager.saveControllerServices();
+                } else {
+                    controllerFacade.save();
+                }
 
-        // create the processor dto
-        final ProcessorDTO processorDTO = new ProcessorDTO();
-        processorDTO.setId(processorId);
-        processorDTO.setConfig(config);
+                return dtoFactory.createControllerServiceDto(controllerService);
+            }
+        });
+    }
 
-        // get the parent group id for the specified processor
-        String groupId = controllerFacade.findProcessGroupIdForProcessor(processorId);
+    @Override
+    public ConfigurationSnapshot<Set<ControllerServiceReferencingComponentDTO>> updateControllerServiceReferencingComponents(final Revision revision, final String controllerServiceId, final org.apache.nifi.controller.ScheduledState scheduledState, final org.apache.nifi.controller.service.ControllerServiceState controllerServiceState) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Set<ControllerServiceReferencingComponentDTO>>() {
+            @Override
+            public Set<ControllerServiceReferencingComponentDTO> execute() {
+                final ControllerServiceReference reference = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
+                return dtoFactory.createControllerServiceReferencingComponentsDto(reference);
+            }
+        });
+    }
 
-        // ensure the parent group id was found
-        if (groupId == null) {
-            throw new ResourceNotFoundException(String.format("Unable to locate Processor with id '%s'.", processorId));
+    @Override
+    public ConfigurationSnapshot<Void> deleteControllerService(final Revision revision, final String controllerServiceId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                // delete the label
+                controllerServiceDAO.deleteControllerService(controllerServiceId);
+
+                // save the update
+                if (properties.isClusterManager()) {
+                    clusterManager.saveControllerServices();
+                } else {
+                    controllerFacade.save();
+                }
+                
+                return null;
+            }
+        });
+    }
+
+    @Override
+    public ConfigurationSnapshot<ReportingTaskDTO> createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ReportingTaskDTO>() {
+            @Override
+            public ReportingTaskDTO execute() {
+                // ensure id is set
+                if (StringUtils.isBlank(reportingTaskDTO.getId())) {
+                    reportingTaskDTO.setId(UUID.randomUUID().toString());
+                }
+
+                // create the reporting
+                final ReportingTaskNode reportingTask = reportingTaskDAO.createReportingTask(reportingTaskDTO);
+
+                // save the update
+                if (properties.isClusterManager()) {
+                    clusterManager.saveReportingTasks();
+                } else {
+                    controllerFacade.save();
+                }
+                
+                return dtoFactory.createReportingTaskDto(reportingTask);
+            }
+        });
+    }
+
+    @Override
+    public ConfigurationSnapshot<ReportingTaskDTO> updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
+        // if reporting task does not exist, then create new reporting task
+        if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId()) == false) {
+            return createReportingTask(revision, reportingTaskDTO);
         }
+        
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ReportingTaskDTO>() {
+            @Override
+            public ReportingTaskDTO execute() {
+                final ReportingTaskNode reportingTask = reportingTaskDAO.updateReportingTask(reportingTaskDTO);
 
-        // update the processor configuration
-        ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
+                // save the update
+                if (properties.isClusterManager()) {
+                    clusterManager.saveReportingTasks();
+                } else {
+                    controllerFacade.save();
+                }
 
-        // update the revision and generate a response
-        final Revision updatedRevision = updateRevision(revision);
-        final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
+                return dtoFactory.createReportingTaskDto(reportingTask);
+            }
+        });
+    }
 
-        // save the flow
-        controllerFacade.save();
+    @Override
+    public ConfigurationSnapshot<Void> deleteReportingTask(final Revision revision, final String reportingTaskId) {
+        return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
+            @Override
+            public Void execute() {
+                // delete the label
+                reportingTaskDAO.deleteReportingTask(reportingTaskId);
 
-        return response;
+                // save the update
+                if (properties.isClusterManager()) {
+                    clusterManager.saveReportingTasks();
+                } else {
+                    controllerFacade.save();
+                }
+                
+                return null;
+            }
+        });
     }
 
     @Override
@@ -1408,9 +1488,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     // -----------------------------------------
     // Read Operations
     // -----------------------------------------
+    
     @Override
     public RevisionDTO getRevision() {
-        return dtoFactory.createRevisionDTO(optimisticLockingManager.getRevision());
+        return dtoFactory.createRevisionDTO(optimisticLockingManager.getLastModification());
     }
 
     @Override
@@ -1637,6 +1718,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public Set<DocumentedTypeDTO> getControllerServiceTypes() {
+        return controllerFacade.getControllerServiceTypes();
+    }
+
+    @Override
+    public Set<DocumentedTypeDTO> getReportingTaskTypes() {
+        return controllerFacade.getReportingTaskTypes();
+    }
+
+    @Override
     public ProcessorDTO getProcessor(String groupId, String id) {
         final ProcessorNode processor = processorDAO.getProcessor(groupId, id);
         final ProcessorDTO processorDto = dtoFactory.createProcessorDto(processor);
@@ -1644,6 +1735,19 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public PropertyDescriptorDTO getProcessorPropertyDescriptor(String groupId, String id, String property) {
+        final ProcessorNode processor = processorDAO.getProcessor(groupId, id);
+        PropertyDescriptor descriptor = processor.getPropertyDescriptor(property);
+        
+        // return an invalid descriptor if the processor doesn't suppor this property
+        if (descriptor == null) {
+            descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
+        }
+        
+        return dtoFactory.createPropertyDescriptorDto(descriptor);
+    }
+
+    @Override
     public StatusHistoryDTO getProcessorStatusHistory(String groupId, String id) {
         return controllerFacade.getProcessorStatusHistory(groupId, id);
     }
@@ -1823,6 +1927,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
         final Date now = new Date();
         controllerConfig.setTimeOffset(TimeZone.getDefault().getOffset(now.getTime()));
+        controllerConfig.setCurrentTime(now);
 
         // determine the site to site configuration
         if (isClustered()) {
@@ -1929,12 +2034,72 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     @Override
     public ConfigurationSnapshot<ProcessGroupDTO> getProcessGroup(String groupId, final boolean recurse) {
         ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
-        Long version = optimisticLockingManager.getRevision().getVersion();
-        ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(version, dtoFactory.createProcessGroupDto(processGroup, recurse));
+        Revision revision = optimisticLockingManager.getLastModification().getRevision();
+        ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(revision.getVersion(), dtoFactory.createProcessGroupDto(processGroup, recurse));
         return response;
     }
 
     @Override
+    public Set<ControllerServiceDTO> getControllerServices() {
+        final Set<ControllerServiceDTO> controllerServiceDtos = new LinkedHashSet<>();
+        for (ControllerServiceNode controllerService : controllerServiceDAO.getControllerServices()) {
+            controllerServiceDtos.add(dtoFactory.createControllerServiceDto(controllerService));
+        }
+        return controllerServiceDtos;
+    }
+
+    @Override
+    public ControllerServiceDTO getControllerService(String controllerServiceId) {
+        return dtoFactory.createControllerServiceDto(controllerServiceDAO.getControllerService(controllerServiceId));
+    }
+
+    @Override
+    public PropertyDescriptorDTO getControllerServicePropertyDescriptor(String id, String property) {
+        final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(id);
+        PropertyDescriptor descriptor = controllerService.getControllerServiceImplementation().getPropertyDescriptor(property);
+        
+        // return an invalid descriptor if the controller service doesn't support this property
+        if (descriptor == null) {
+            descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
+        }
+        
+        return dtoFactory.createPropertyDescriptorDto(descriptor);
+    }
+    
+    @Override
+    public Set<ControllerServiceReferencingComponentDTO> getControllerServiceReferencingComponents(String controllerServiceId) {
+        final ControllerServiceNode service = controllerServiceDAO.getControllerService(controllerServiceId);
+        return dtoFactory.createControllerServiceReferencingComponentsDto(service.getReferences());
+    }
+
+    @Override
+    public Set<ReportingTaskDTO> getReportingTasks() {
+        final Set<ReportingTaskDTO> reportingTaskDtos = new LinkedHashSet<>();
+        for (ReportingTaskNode reportingTask : reportingTaskDAO.getReportingTasks()) {
+            reportingTaskDtos.add(dtoFactory.createReportingTaskDto(reportingTask));
+        }
+        return reportingTaskDtos;
+    }
+
+    @Override
+    public ReportingTaskDTO getReportingTask(String reportingTaskId) {
+        return dtoFactory.createReportingTaskDto(reportingTaskDAO.getReportingTask(reportingTaskId));
+    }
+
+    @Override
+    public PropertyDescriptorDTO getReportingTaskPropertyDescriptor(String id, String property) {
+        final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(id);
+        PropertyDescriptor descriptor = reportingTask.getReportingTask().getPropertyDescriptor(property);
+        
+        // return an invalid descriptor if the reporting task doesn't support this property
+        if (descriptor == null) {
+            descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
+        }
+        
+        return dtoFactory.createPropertyDescriptorDto(descriptor);
+    }
+    
+    @Override
     public StatusHistoryDTO getProcessGroupStatusHistory(String groupId) {
         return controllerFacade.getProcessGroupStatusHistory(groupId);
     }
@@ -1974,9 +2139,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public ProcessorHistoryDTO getProcessorHistory(String processorId) {
+    public ComponentHistoryDTO getComponentHistory(String componentId) {
         final Map<String, PropertyHistoryDTO> propertyHistoryDtos = new LinkedHashMap<>();
-        final Map<String, List<PreviousValue>> propertyHistory = auditService.getPreviousValues(processorId);
+        final Map<String, List<PreviousValue>> propertyHistory = auditService.getPreviousValues(componentId);
 
         for (final Map.Entry<String, List<PreviousValue>> entry : propertyHistory.entrySet()) {
             final List<PreviousValueDTO> previousValueDtos = new ArrayList<>();
@@ -1996,8 +2161,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
             }
         }
 
-        final ProcessorHistoryDTO history = new ProcessorHistoryDTO();
-        history.setProcessorId(processorId);
+        final ComponentHistoryDTO history = new ComponentHistoryDTO();
+        history.setComponentId(componentId);
         history.setPropertyHistory(propertyHistoryDtos);
 
         return history;
@@ -2718,6 +2883,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         this.processGroupDAO = processGroupDAO;
     }
 
+    public void setControllerServiceDAO(ControllerServiceDAO controllerServiceDAO) {
+        this.controllerServiceDAO = controllerServiceDAO;
+    }
+
+    public void setReportingTaskDAO(ReportingTaskDAO reportingTaskDAO) {
+        this.reportingTaskDAO = reportingTaskDAO;
+    }
+
     public void setTemplateDAO(TemplateDAO templateDAO) {
         this.templateDAO = templateDAO;
     }